Spark连接Hbase
case class student(name: String, ScClass: Int, sex: String, province: String);
//在hbase中创建一个表student,有一个 message列族
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("wula").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
val rdd1: RDD[String] = sc.parallelize("飞松\t3\t女\t山东省\n刚洁\t1\t男\t深圳市\n格格\t4\t女\t四川省\n谷菱\t5\t女\t河北省\n国立\t2\t男\t四川省\n海涛\t3\t男\t广东省\n含芙\t3\t女\t四川省\n华敏\t4\t女\t上海市\n乐和\t2\t男\t上海市\n乐家\t3\t男\t黑龙江\n乐康\t4\t男\t湖北省\n乐人\t5\t男\t四川省\n乐水\t3\t男\t北京市\n乐天\t4\t男\t河北省\n乐童\t5\t男\t江苏省\n乐贤\t1\t男\t陕西省\n乐音\t2\t男\t广东省\n李仁\t3\t男\t湖北省\n立涛\t3\t女\t陕西省\n凌青\t4\t女\t湖北省\n陆涛\t4\t男\t山东省\n媚媚\t5\t女\t河南省\n梦亿\t4\t男\t江苏省\n铭忠\t5\t男\t四川省\n慕梅\t3\t女\t北京市\n鹏吉\t1\t男\t上海市\n娉婷\t4\t女\t河南省\n淇峰\t2\t男\t广东省\n庆元\t3\t男\t上海市\n庆滋\t4\t男\t北京市\n丘东\t5\t男\t江苏省\n荣郑\t1\t男\t黑龙江\n蕊蕊\t5\t女\t四川省\n尚凯\t2\t男\t北京市\n诗涵\t1\t女\t河南省\n淑凤\t2\t女\t天津市\n淑娇\t3\t女\t上海市\n淑燕\t4\t女\t河北省\n淑怡\t4\t女\t广东省\n思璇\t2\t女\t湖北省\n苏华\t3\t女\t山东省\n苏梅\t4\t女\t四川省\n听荷\t5\t女\t深圳市\n文怡\t1\t女\t天津市\n文怡\t2\t女\t河北省\n香凝\t3\t女\t山东省\n翔云\t4\t女\t河南省\n小芸\t5\t女\t深圳市".split("\n"))
val rdd2: RDD[student] = rdd1.map(x => {
val strings = x.split("\t")
student(strings(0), strings(1).toInt, strings(2), strings(3))
})
[infobox title="创建表"]
//创建表------------------
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
//设置表名 列族
val stuTable = TableName.valueOf("studentdd")
val tableDescr = new HTableDescriptor(stuTable)
tableDescr.addFamily(new HColumnDescriptor("message".getBytes))
val admin = new HBaseAdmin(conf)
//判断表存在就禁用并删除
if (admin.tableExists(stuTable)) {
admin.disableTable(stuTable)
admin.deleteTable(stuTable)
}
//创建
admin.createTable(tableDescr)
//创建表------------------
[/infobox]
[infobox title="Hbae写入数据"]
//-------------------Hbae写入-------------------
//HBase配置信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
conf.set(TableInputFormat.INPUT_TABLE, "student")
//存储将写入Hbase数据的RDD
val hRdd: RDD[(ImmutableBytesWritable, Put)] = rdd2.map(x => {
val rowkey = Bytes.toBytes(x.name)
val put = new Put(rowkey)
put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("name"), Bytes.toBytes(x.name))
put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("class"), Bytes.toBytes(x.ScClass))
put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("sex"), Bytes.toBytes(x.sex))
put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("province"), Bytes.toBytes(x.province))
(new ImmutableBytesWritable(rowkey), put)
})
//指定输出格式和输出表名
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")
//Hbase写入数据
hRdd.saveAsHadoopDataset(jobConf)
//-------------------Hbae写入-------------------
[/infobox]
[infobox title="Hbae读取数据"]
//-------------------Hbae读取-------------------
//HBase配置信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
conf.set(TableInputFormat.INPUT_TABLE, "student")
//根据conf中配置好的scan来从Hbase的数据列族中读取包含(ImmutableBytesWritable, Result)的RDD
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//便利循环输出
hbaseRDD.foreach {
case (rowKey, result) => {
val cells: Array[Cell] = result.rawCells()
for (cell <- cells) {
val rowKey = Bytes.toString(CellUtil.cloneRow(cell))
val family = Bytes.toString(CellUtil.cloneFamily(cell))
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
println(rowKey + "--" + family + "--" + qualifier + "--" + value)
}
}
}
//-------------------Hbae读取-------------------
[/infobox]
//关闭
sc.stop()
Spark连接mysql
def getConn(): Connection = {
DriverManager.getConnection("jdbc:mysql://node01:3306/test?characterEncoding=UTF-8", "root", "123456")
}
case class user(name: String, birthday: Date, sex: String, address: String)
val sparkConf = new SparkConf().setAppName("wula").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
val df = new SimpleDateFormat("yyyy/mm/dd")
val dfs = new SimpleDateFormat("yyyy-mm-dd")
val personlist = List("安荷 1998/2/7 女 江苏省", "白秋 2000/3/7 女 天津市", "雪莲 1998/6/7 女 湖北省", "宾白 1999/7/3 男 河北省", "宾实 2000/8/7 男 河北省", "斌斌 1998/3/7 男 江苏省")
val users: List[user] = personlist.map(_.split(" ")).map(x => {
user(x(0), df.parse(x(1)), x(2), x(3))
})
//将数据存入到MySQL
//JDBC配置信息 获取连接
val conn: Connection = DriverManager.getConnection("jdbc:mysql://node01:3306/test?characterEncoding=UTF-8", "root", "123456")
[infobox title="mysql建表"]
//---------------------mysql建表---------------------
val sql = "CREATE TABLE `user` (\n `id` int(11) NOT NULL AUTO_INCREMENT,\n `username` varchar(32) NOT NULL COMMENT '用户名称',\n `birthday` date DEFAULT NULL COMMENT '生日',\n `sex` char(1) DEFAULT NULL COMMENT '性别',\n `address` varchar(256) DEFAULT NULL COMMENT '地址',\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;"
//sql语句执行
conn.prepareStatement(sql).execute()
[/infobox]
[infobox title="mysql写入数据"]
//---------------------mysql写入数据---------------------
users.foreach(data => {
//---------------------将每一条数据存入到MySQL---------------------
//sql语句 赋值
val sql = "INSERT INTO `user` (`id`, `username`,`birthday`, `sex`, `address`) VALUES (Null,?,?,?,?);"
val ps: PreparedStatement = conn.prepareStatement(sql)
ps.setString(1, data.name)
ps.setDate(2, java.sql.Date.valueOf(dfs.format(data.birthday)))
ps.setString(3, data.sex)
ps.setString(4, data.address)
//sql语句执行
ps.execute() //preparedStatement.addBatch()
})
ps.executeBatch()
conn.close()
[/infobox]
[infobox title="MySql数据查询"]
//---------------------MySql数据查询---------------------
//case class user(name: String, birthday: Date, sex: String, address: String)
//查询id 0-60
val userRdd: JdbcRDD[(user)] = new JdbcRDD(sc,
getConn,
"select * from user where id >= ? and id <= ?;", 0, 60, 2, x => {
val name: String = x.getString("username")
val birthday: Date = x.getDate("birthday")
val sex: String = x.getString("sex")
val address: String = x.getString("address")
user(name, birthday, sex, address)
}
)
userRdd.foreach(println)
[/infobox]
sc.stop()
文章评论