Java API如何实现向Hive批量导入数据

Java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * <p>Description: </p>
 * @author kangkaia
 * @date 2017年12月26日 下午1:42:24
 */
public class HiveJdbc {
    public static void main(String[] args) throws IOException {
    	List<List> argList = new ArrayList<List>();
		List<String> arg = new ArrayList<String>();
		arg.add("12345");
		arg.add("m");
		argList.add(arg);
		arg = new ArrayList<String>();
		arg.add("54321");
		arg.add("f");
		argList.add(arg);
//		System.out.println(argList.toString());
		String dst = "/test/kk.txt";
		createFile(dst,argList);
		loadData2Hive(dst);
    }

    /**
     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"
     * @param dst
     * @param contents
     * @throws IOException
     */
    public static void createFile(String dst , List<List> argList) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dstPath = new Path(dst); //目标路径
        //打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        StringBuffer sb = new StringBuffer();
        for(List<String> arg:argList){
			for(String value:arg){
				sb.append(value).append("\001");
			}
			sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符
			sb.append("\n");
		}
        sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符
        byte[] contents =  sb.toString().getBytes();
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
    }
    /**
     * 将HDFS文件load到hive表中
     * @param dst
     */
    public static void loadData2Hive(String dst) {
    	String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
    	String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
    	String username = "admin";
        String password = "admin";
        Connection con = null;

		try {
			Class.forName(JDBC_DRIVER);
			con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
			Statement stmt = con.createStatement();
			String sql = " load data inpath '"+dst+"' into table population.population_information ";

			stmt.execute(sql);
			System.out.println("loadData到Hive表成功!");
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}finally {
			// 关闭rs、ps和con
			if(con != null){
				try {
					con.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

insert overwrite table seqfile_table select * from textfile_table;
……
insert overwrite table rcfile_table select * from textfile_table;

java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

  <dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-jdbc</artifactId>
   <version>1.1.0</version>
  </dependency>

代码

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Demo {
	    public static void main(String[] args) throws Exception {
	    	List<List> argList = new ArrayList<List>();
			List<String> arg = new ArrayList<String>();
			arg.add("12345");
			arg.add("m");
			argList.add(arg);
			arg = new ArrayList<String>();
			arg.add("54321");
			arg.add("f");
			argList.add(arg);
//			System.out.println(argList.toString());
			String dst = "/test/kk.txt";
			createFile(dst,argList);
//			loadData2Hive(dst);
	    }
	    /**
	     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"
	     * @param dst
	     * @param contents
	     * @throws IOException
	     * @throws Exception
	     * @throws InterruptedException
	     */
	    public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{
	        Configuration conf = new Configuration();
			FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");
	        Path dstPath = new Path(dst); //目标路径
	        //打开一个输出流
	        FSDataOutputStream outputStream = fs.create(dstPath);
	        StringBuffer sb = new StringBuffer();
	        for(List<String> arg:argList){
				for(String value:arg){
					sb.append(value).append("|");
				}
				sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符
				sb.append("\n");
			}
	        byte[] contents =  sb.toString().getBytes();
	        outputStream.write(contents);
			outputStream.flush();;
	        outputStream.close();
	        fs.close();
	        System.out.println("文件创建成功!");

	    }
	    /**
	     * 将HDFS文件load到hive表中
	     * @param dst
	     */
	    public static void loadData2Hive(String dst) {
	    	String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
	    	String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";
	    	String username = "root";
	        String password = "root";
	        Connection con = null;

			try {
				Class.forName(JDBC_DRIVER);
				con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
				Statement stmt = con.createStatement();

				String sql = " load data inpath '"+dst+"' into table test ";//test 为插入的表

				stmt.execute(sql);
				System.out.println("loadData到Hive表成功!");
			} catch (SQLException e) {
				e.printStackTrace();
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			}finally {
				// 关闭rs、ps和con
				if(con != null){
					try {
						con.close();
					} catch (SQLException e) {
						e.printStackTrace();
					}
				}
			}
		}

	}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

时间: 2021-07-21

SpringBoot连接Hive实现自助取数的示例

原文链接: http://www.ikeguang.com/?p=815 公司运营免不了让我们数据做一些临时取数,这些取数有时候是重复的,或者可以做成可配置的.需要开发成界面,供他们选择,自然想到SpringBoot连接Hive,可以把取数做成一键生成,或者让他们自己写sql,通常大多人是不会sql的. 1. 需要的依赖配置 为了节省篇幅,这里给出hiveserver2方式连接hive主要的maven依赖,父工程springboot依赖省略. <!-- 版本信息 --> <propert

java中栈和队列的实现和API的用法(详解)

在java中要实现栈和队列,需要用到java集合的相关知识,特别是Stack.LinkedList等相关集合类型. 一.栈的实现 栈的实现,有两个方法:一个是用java本身的集合类型Stack类型:另一个是借用LinkedList来间接实现Stack. 1.Stack实现 直接用Stack来实现非常方便,常用的api函数如下: boolean        isEmpty() // 判断当前栈是否为空 synchronized E        peek() //获得当前栈顶元素 synchro

基于Java8 Stream API实现数据抽取收集

目标&背景 我们以"处理订单数据"为例,假设我们的应用是一个分布式应用,有"订单应用","物流应用","商品应用"等都是独立的服务.本次我们的目的需要展示订单列表完整数据: 1.查询订单列表. 2.批量查询物流信息. 3.将物流信息填充到订单主信息中. 假设我们定义了一个订单类,具有几个关键的属性:订单号,状态,订单价,快递信息.如下所示: class Order{ String orderSeq; String st

Java实现后台发送及接收json数据的方法示例

本文实例讲述了Java实现后台发送及接收json数据的方法.分享给大家供大家参考,具体如下: 本篇博客试用于编写java后台接口以及两个项目之间的接口对接功能: 具体的内容如下: 1.java后台给指定接口发送json数据 package com.utils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Htt

hive从mysql导入数据量变多的解决方案

原始导数命令: bin/sqoop import -connect jdbc:mysql://192.168.169.128:3306/yubei -username root -password 123456 -table yl_city_mgr_evt_info --split-by rec_id -m 4 --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --hive-ov

MYSQL 导入数据的几种不同

Source C:\\sql.txt ; // 這種是用來執行標准的SQL 語句. for Example : insert into a(A,b,c) Values (1,2,3); LoadData C:\\data.txt MYSQL 導入數據的幾種不同// 這種是MSSQL用BCP生成的數據差不多.

MySQL 4种导入数据的方法

1.mysql 命令导入 使用 mysql 命令导入语法格式为: mysql -u用户名 -p密码 < 要导入的数据库数据(runoob.sql) 实例: # mysql -uroot -p123456 < runoob.sql 以上命令将将备份的整个数据库 runoob.sql 导入. 2.source 命令导入 source 命令导入数据库需要先登录到数库终端: mysql> create database abc; # 创建数据库 mysql> use abc; # 使用已创

MySQL如何快速导入数据

前言: 日常学习和工作中,经常会遇到导数据的需求.比如数据迁移.数据恢复.新建从库等,这些操作可能都会涉及大量数据的导入.有时候导入进度慢,电脑风扇狂转真的很让人崩溃,其实有些小技巧是可以让导入更快速的,本篇文章笔者会谈一谈如何快速的导入数据. 注:本篇文章只讨论如何快速导入由逻辑备份产生的SQL脚本,其他文件形式暂不讨论. 1.尽量减小导入文件大小 首先给个建议,导出导入数据尽量使用MySQL自带的命令行工具,不要使用Navicat.workbench等图形化工具.特别是大数据量的时候,用My

PHP上传Excel文件导入数据到MySQL数据库示例

最近在做Excel文件导入数据到数据库.网站如果想支持批量插入数据,可以制作一个上传Excel文件,导入里面的数据内容到MySQL数据库的小程序. 要用到的工具: ThinkPHP:轻量级国产PHP开发框架.可在ThinkPHP官网下载. PHPExcel:Office Excel 文档的一个PHP类库,它基于微软的OpenXML标准和PHP语言.可在CodePlex官网下载.. 1.设计MySQL数据库product 创建product数据库 CREATE DATABASE product D

php 在线导入mysql大数据程序

php 在线导入 mysql 大数据程序 <?php header("content-type:text/html;charset=utf-8"); error_reporting(E_ALL); set_time_limit(0); $file='./test.sql'; $data=file($file); echo "<pre>"; //print_r($data); $data_new=array(); $tmp=array(); fore

mysql导入导出数据中文乱码解决方法小结

linux系统中 linux默认的是utf8编码,而windows是gbk编码,所以会出现上面的乱码问题. 解决mysql导入导出数据乱码问题 首先要做的是要确定你导出数据的编码格式,使用mysqldump的时候需要加上--default-character-set=utf8, 例如下面的代码: 复制代码 代码如下: mysqldump -uroot -p --default-character-set=utf8 dbname tablename > bak.sql 那么导入数据的时候也要使用-

MySQL中数据导入恢复的简单教程

有两个简单的方法MySQL中的数据加载到MySQL数据库从先前备份的文件. LOAD DATA导入数据: MySQL提供了LOAD DATA语句,作为一个大容量数据加载.下面是一个例子声明中,读取一个文件dump.txt,,从当前目录加载到当前数据库中的表mytbl: mysql> LOAD DATA LOCAL INFILE 'dump.txt' INTO TABLE mytbl; 如果本地的关键字是不存在的,MySQL的外观使用绝对路径名寻找到完全指定位置的文件在服务器主机上的数据文件,从文

分析Mysql大量数据导入遇到的问题以及解决方案

在项目中,经常会碰到往数据库中导入大量数据,以便利用sql进行数据分析.在导入数据的过程中会碰到一些需要解决的问题,这里结合导入一个大约4G的txt数据的实践,把碰到的问题以及解决方法展现出来,一方面自己做个总结记录,另一方面希望对那些碰到相同问题的朋友有个参考. 我导入的数据是百科的txt文件,文件大小有4G多,数据有6500万余条,每条数据通过换行符分隔.每条数据包含三个字段,字段之间通过Tab分隔.将数据取出来的方法我采用的是用一个TripleData类来存放这三个字段,字段都用Strin

Java利用MYSQL LOAD DATA LOCAL INFILE实现大批量导入数据到MySQL

Mysql load data的使用 数据库中,最常见的写入数据方式是通过SQL INSERT来写入,另外就是通过备份文件恢复数据库,这种备份文件在MySQL中是SQL脚本,实际上执行的还是在批量INSERT语句. 在实际中,常常会遇到两类问题:一类是数据导入,比如从word.excel表格或者txt文档导入数据(这些数据一般来自于非技术人员通过OFFICE工具录入的文档):一类数据交换,比如从MySQL.Oracle.DB2数据库之间的数据交换. 这其中就面临一个问题:数据库SQL脚本有差异,

MySQL 表数据的导入导出操作示例

本文实例讲述了MySQL 表数据的导入导出操作.分享给大家供大家参考,具体如下: 数据导出 1.  使用 SELECT ...INTO OUTFILE ...命令来导出数据,具体语法如下. mysql> SELECT * FROM tablename INTO OUTFILE 'target_file' [option]; 其中 option 参数可以是以下选项: FIELDS TEMINATED BY 'string' (字符分断符) FIELDS [OPTIONALLY] ENCLOSED