/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package com.pucpr.atp.fundbigdata; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Classe que implementa o MapReduce para a etapa 2 da ATP * * Informação: Mercadoria com maior quantidade de transações financeiras em * 2016, no Brasil (como a base de dados está em inglês, utilize Brazil) * * @author bruno.saragosa */ public class Implementacao6 { public static class MapperImplementacao6 extends Mapper { @Override public void map(Object chave, Text valor, Context context) throws IOException, InterruptedException { String linha = valor.toString(); String[] campos = linha.split(";"); if (campos.length == 10 && campos[1].equals("2016") && campos[0].equals("Brazil")) { String mercadoria = campos[3]; int ocorrencia = 1; Text chaveMap = new Text(mercadoria); IntWritable valorMap = new IntWritable(ocorrencia); context.write(chaveMap, valorMap); } } } public static class ReducerImplementacao6 extends Reducer { @Override public void reduce(Text chave, Iterable valores, Context context) throws IOException, InterruptedException { int soma = 0; for (IntWritable val : valores) { soma += val.get(); } IntWritable valorSaida = new IntWritable(soma); context.write(chave, valorSaida); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String arquivoEntrada = "/home/Disciplinas/FundamentosBigData/OperacoesComerciais/base_100_mil.csv"; String arquivoSaida = "/home2/ead2021/SEM1/bruno.saragosa/bruno.saragosa/implementacao6"; if (args.length == 2) { arquivoEntrada = args[0]; arquivoSaida = args[1]; } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "atp01-imp01"); job.setJarByClass(Implementacao6.class); job.setMapperClass(MapperImplementacao6.class); job.setReducerClass(ReducerImplementacao6.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(arquivoEntrada)); FileOutputFormat.setOutputPath(job, new Path(arquivoSaida)); job.waitForCompletion(true); } }