根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。
数据说明如下: a.txt
数据切分方式:,
数据所在位置:/user/test/input/a.txt
15733218050,15778423030,1542457633,1542457678,450000,530000
15733218050 | 15778423030 | 1542457633 | 1542457678 | 450000 | 530000 |
---|---|---|---|---|---|
呼叫者手机号 | 接受者手机号 | 开始时间戳(s) | 接受时间戳(s) | 呼叫者地址省份编码 | 接受者地址省份编码 |
Mysql
数据库:
用户名:root
密码:123123
数据库名:mydb
用户表:userphone
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
phone | varchar(255) | 手机号 | ||
trueName | varchar(255) | 真实姓名 |
地址省份表:allregion
列名 | 类型 | 非空 | 是否自增 | 介绍 |
---|---|---|---|---|
id | int(11) | √ | √ | 用户ID |
CodeNum | varchar(255) | 编号 | ||
Address | varchar(255) | 地址 |
清洗规则:
-
处理数据中的时间戳(秒级)将其转化为
2017-06-21 07:01:58
,年-月-日 时:分:秒 这种格式; -
处理数据中的省份编码,结合
mysql
的表数据对应,将其转换成省份名称; -
处理用户手机号,与
mysql
的表数据对应,关联用户的真实姓名; -
处理数据中的开始时间与结束时间并计算通信时长(以秒为单位);
-
设置数据来源文件路径及清洗后的数据存储路径: 数据来源路径为:
/user/test/input/a.txt (HDFS)
; 清洗后的数据存放于:/user/test/output (HDFS)
。
数据清洗后如下:
邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市
邓二 | 张倩 | 13666666666 | 15151889601 | 2018-03-29 10:58:12 | 2018-03-29 10:58:42 | 30 | 黑龙江省 | 上海市 |
---|---|---|---|---|---|---|---|---|
用户名A | 用户名B | 用户A的手机号 | 用户B的手机号 | 开始时间 | 结束时间 |
step/com/LogMR.java
package com;import java.io.IOException;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogMR { static class MyMapper extends Mapper { Map userMap = new HashMap<>(); Map addressMap = new HashMap<>(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); PhoneLog pl = new PhoneLog(); Text text = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { Connection connection = DBHelper.getConnection(); try { Statement statement = connection.createStatement(); String sql = "select * from userphone"; ResultSet resultSet = statement.executeQuery(sql); while (resultSet.next()) { String phone = resultSet.getString(2); String trueName = resultSet.getString(3); userMap.put(phone, trueName); } String sql2 = "select * from allregion"; ResultSet resultSetA = statement.executeQuery(sql2); while (resultSetA.next()) { String phone = resultSetA.getString(2); String trueName = resultSetA.getString(3); addressMap.put(phone, trueName); } } catch (SQLException e) { e.printStackTrace(); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String str = value.toString(); String[] split = str.split(","); if (split.length == 6) { String trueName1 = userMap.get(split[0]); String trueName2 = userMap.get(split[1]); String address1 = addressMap.get(split[4]); String address2 = addressMap.get(split[5]); long startTimestamp = Long.parseLong(split[2]); String startTime = sdf.format(startTimestamp * 1000); long endTimestamp = Long.parseLong(split[3]); String endTime = sdf.format(endTimestamp * 1000); long timeLen = endTimestamp - startTimestamp; pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1, address2); context.write(pl, NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogMR.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(PhoneLog.class); job.setMapOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); Path inPath = new Path("/user/test/input/a.txt"); Path out = new Path("/user/test/output"); FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, out); job.waitForCompletion(true); } }
step/com/DBHelper.java
package com;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class DBHelper { private static final String driver = "com.mysql.jdbc.Driver"; private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8"; private static final String username = "root";// 数据库的用户名 private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。 private static Connection conn = null; // 声明数据库连接对象 static { try { Class.forName(driver); } catch (Exception ex) { ex.printStackTrace(); } } public static Connection getConnection() { if (conn == null) { try { conn = DriverManager.getConnection(url, username, password); } catch (SQLException e) { e.printStackTrace(); } // 连接数据库 return conn; } return conn; } }
step/com/phonelog.java
package com;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class PhoneLog implements WritableComparable { private String userA; private String userB; private String userA_Phone; private String userB_Phone; private String startTime; private String endTime; private Long timeLen; private String userA_Address; private String userB_Address; public PhoneLog() { } public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime, String endTime, Long timeLen, String userA_Address, String userB_Address) { this.userA = userA; this.userB = userB; this.userA_Phone = userA_Phone; this.userB_Phone = userB_Phone; this.startTime = startTime; this.endTime = endTime; this.timeLen = timeLen; this.userA_Address = userA_Address; this.userB_Address = userB_Address; } public String getUserA_Phone() { return userA_Phone; } public void setUserA_Phone(String userA_Phone) { this.userA_Phone = userA_Phone; } public String getUserB_Phone() { return userB_Phone; } public void setUserB_Phone(String userB_Phone) { this.userB_Phone = userB_Phone; } public String getUserA() { return userA; } public void setUserA(String userA) { this.userA = userA; } public String getUserB() { return userB; } public void setUserB(String userB) { this.userB = userB; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public Long getTimeLen() { return timeLen; } public void setTimeLen(Long timeLen) { this.timeLen = timeLen; } public String getUserA_Address() { return userA_Address; } public void setUserA_Address(String userA_Address) { this.userA_Address = userA_Address; } public String getUserB_Address() { return userB_Address; } public void setUserB_Address(String userB_Address) { this.userB_Address = userB_Address; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(userA); out.writeUTF(userB); out.writeUTF(userA_Phone); out.writeUTF(userB_Phone); out.writeUTF(startTime); out.writeUTF(endTime); out.writeLong(timeLen); out.writeUTF(userA_Address); out.writeUTF(userB_Address); } @Override public void readFields(DataInput in) throws IOException { userA = in.readUTF(); userB = in.readUTF(); userA_Phone = in.readUTF(); userB_Phone = in.readUTF(); startTime = in.readUTF(); endTime = in.readUTF(); timeLen = in.readLong(); userA_Address = in.readUTF(); userB_Address = in.readUTF(); } @Override public String toString() { return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + "," + timeLen + "," + userA_Address + "," + userB_Address; } @Override public int compareTo(PhoneLog pl) { if(this.hashCode() == pl.hashCode()) { return 0; } return -1; }}
最后重启hadoop#start-all.sh 完成评测
来源地址:https://blog.csdn.net/qq_61604164/article/details/127868559