[dangerbox title="原加载数据代码"]
reduce先将数据写入到hdfs,再load加载数据
public static void main(String[] args) throws URISyntaxException, IOException, TaskExecFailException {
//设置链接的服务器
ConnBean connBean = new ConnBean("node01", "root", "123456");
//链接服务器
SSHExec sshExec = SSHExec.getInstance(connBean);
sshExec.connect();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), conf);
FileStatus stats[] = fs.listStatus(new Path("/rua/networkqualityinfo"));
for (FileStatus stat : stats) {
Path path = stat.getPath();
String fileName = path.getName().replace(".txt", "");
String[] sps = fileName.split("-");
String year = sps[0];
String month = sps[1];
String day = sps[2];
//设置执行的命令
ExecCommand execCommand = new ExecCommand("hive -e \"load data inpath '/rua/networkqualityinfo/" + path.getName() + "' into table rua.networkqualityinfo partition(year='"+year+"',month='"+month+"',day='"+day+"')\"");
//执行命令
Result exec = sshExec.exec(execCommand);
}
fs.close();
//关闭连接
sshExec.disconnect();
}
[/dangerbox]
[dangerbox title="优化后"]
我们可以直接将数据按格式写入到hdfs下的hive文件夹中,再进行修复
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
String[] sps = key.toString().split("-");
String year = sps[0];
String month = sps[1];
String day = sps[2];
Path path = new Path("/user/hive/warehouse/rua.db/app_traffic/year=" + year + "/month=" + month + "/day=" + day + "/" + (year + "-" + month + "-" + day));
// Path path = new Path("/rua/app_traffic/" + (year + "-" + month + "-" + day) + ".txt");
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://192.168.100.101:8020"), new Configuration());
} catch (URISyntaxException e) {
e.printStackTrace();
}
FSDataOutputStream fsDataOutputStream = fs.create(path);
for (Text value : values) {
byte[] bytes = (value.toString() + "\n").getBytes();
fsDataOutputStream.write(bytes, 0, bytes.length);
}
fsDataOutputStream.close();
fs.close();
}
msck repair table app_traffic;
[/dangerbox]
Comments NOTHING