hive大量数据导入分区表时过慢优化

kamisamak 发布于 2019-12-25 2043 次阅读


[dangerbox title="原加载数据代码"]

reduce先将数据写入到hdfs,再load加载数据

  public static void main(String[] args) throws URISyntaxException, IOException, TaskExecFailException {
        //设置链接的服务器
        ConnBean connBean = new ConnBean("node01", "root", "123456");
        //链接服务器
        SSHExec sshExec = SSHExec.getInstance(connBean);
        sshExec.connect();

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), conf);
        FileStatus stats[] = fs.listStatus(new Path("/rua/networkqualityinfo"));
        for (FileStatus stat : stats) {
            Path path = stat.getPath();
            String fileName = path.getName().replace(".txt", "");
            String[] sps = fileName.split("-");
            String year = sps[0];
            String month = sps[1];
            String day = sps[2];

            //设置执行的命令
            ExecCommand execCommand = new ExecCommand("hive -e \"load data inpath '/rua/networkqualityinfo/" + path.getName() + "' into table rua.networkqualityinfo partition(year='"+year+"',month='"+month+"',day='"+day+"')\"");
            //执行命令
            Result exec = sshExec.exec(execCommand);

        }
        fs.close();
        //关闭连接
        sshExec.disconnect();
    }

[/dangerbox]

[dangerbox title="优化后"]

我们可以直接将数据按格式写入到hdfs下的hive文件夹中,再进行修复

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        String[] sps = key.toString().split("-");
        String year = sps[0];
        String month = sps[1];
        String day = sps[2];
        Path path = new Path("/user/hive/warehouse/rua.db/app_traffic/year=" + year + "/month=" + month + "/day=" + day + "/" + (year + "-" + month + "-" + day));
//        Path path = new Path("/rua/app_traffic/" + (year + "-" + month + "-" + day) + ".txt");

        FileSystem fs = null;
        try {
            fs = FileSystem.get(new URI("hdfs://192.168.100.101:8020"), new Configuration());
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }

        FSDataOutputStream fsDataOutputStream = fs.create(path);

        for (Text value : values) {
            byte[] bytes = (value.toString() + "\n").getBytes();
            fsDataOutputStream.write(bytes, 0, bytes.length);
        }

        fsDataOutputStream.close();
        fs.close();
    }
msck repair table app_traffic;

[/dangerbox]