文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

第1关:MapReduce综合应用案例 — 电信数据清洗

2023-10-24 14:26

关注

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

数据说明如下: a.txt

数据切分方式:,

数据所在位置:/user/test/input/a.txt

15733218050,15778423030,1542457633,1542457678,450000,530000

157332180501577842303015424576331542457678450000530000
呼叫者手机号接受者手机号开始时间戳(s)接受时间戳(s)呼叫者地址省份编码接受者地址省份编码

Mysql数据库:

用户名:root 密码:123123

数据库名:mydb

用户表:userphone

列名类型非空是否自增介绍
idint(11)用户ID
phonevarchar(255)手机号
trueNamevarchar(255)真实姓名

地址省份表:allregion

列名类型非空是否自增介绍
idint(11)用户ID
CodeNumvarchar(255)编号
Addressvarchar(255)地址

清洗规则:

数据清洗后如下:

邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市

邓二张倩13666666666151518896012018-03-29 10:58:122018-03-29 10:58:4230黑龙江省上海市
用户名A用户名B用户A的手机号用户B的手机号开始时间结束时间

step/com/LogMR.java

package com;import java.io.IOException;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogMR {        static class MyMapper extends Mapper {        Map userMap = new HashMap<>();        Map addressMap = new HashMap<>();        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        PhoneLog pl = new PhoneLog();        Text text = new Text();        @Override        protected void setup(Context context) throws IOException, InterruptedException {            Connection connection = DBHelper.getConnection();            try {                Statement statement = connection.createStatement();                String sql = "select * from userphone";                ResultSet resultSet = statement.executeQuery(sql);                while (resultSet.next()) {                    String phone = resultSet.getString(2);                    String trueName = resultSet.getString(3);                    userMap.put(phone, trueName);                }                String sql2 = "select * from allregion";                ResultSet resultSetA = statement.executeQuery(sql2);                while (resultSetA.next()) {                    String phone = resultSetA.getString(2);                    String trueName = resultSetA.getString(3);                    addressMap.put(phone, trueName);                }            } catch (SQLException e) {                e.printStackTrace();            }        }        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String str = value.toString();            String[] split = str.split(",");            if (split.length == 6) {                String trueName1 = userMap.get(split[0]);                String trueName2 = userMap.get(split[1]);                String address1 = addressMap.get(split[4]);                String address2 = addressMap.get(split[5]);                long startTimestamp = Long.parseLong(split[2]);                String startTime = sdf.format(startTimestamp * 1000);                long endTimestamp = Long.parseLong(split[3]);                String endTime = sdf.format(endTimestamp * 1000);                long timeLen = endTimestamp - startTimestamp;                pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,                        address2);                context.write(pl, NullWritable.get());            }        }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(LogMR.class);        job.setMapperClass(MyMapper.class);        job.setMapOutputKeyClass(PhoneLog.class);        job.setMapOutputValueClass(NullWritable.class);        job.setNumReduceTasks(0);        Path inPath = new Path("/user/test/input/a.txt");        Path out = new Path("/user/test/output");        FileInputFormat.setInputPaths(job, inPath);        FileOutputFormat.setOutputPath(job, out);        job.waitForCompletion(true);    }    }

step/com/DBHelper.java

package com;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class DBHelper {        private static final String driver = "com.mysql.jdbc.Driver";    private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";    private static final String username = "root";// 数据库的用户名    private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。    private static Connection conn = null; // 声明数据库连接对象    static {        try {            Class.forName(driver);        } catch (Exception ex) {            ex.printStackTrace();        }    }    public static Connection getConnection() {        if (conn == null) {            try {                conn = DriverManager.getConnection(url, username, password);            } catch (SQLException e) {                e.printStackTrace();            } // 连接数据库            return conn;        }        return conn;    }    }

step/com/phonelog.java

package com;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;public class PhoneLog implements WritableComparable {    private String userA;    private String userB;    private String userA_Phone;    private String userB_Phone;    private String startTime;    private String endTime;    private Long timeLen;    private String userA_Address;    private String userB_Address;    public PhoneLog() {    }    public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,            String endTime, Long timeLen, String userA_Address, String userB_Address) {        this.userA = userA;        this.userB = userB;        this.userA_Phone = userA_Phone;        this.userB_Phone = userB_Phone;        this.startTime = startTime;        this.endTime = endTime;        this.timeLen = timeLen;        this.userA_Address = userA_Address;        this.userB_Address = userB_Address;    }    public String getUserA_Phone() {        return userA_Phone;    }    public void setUserA_Phone(String userA_Phone) {        this.userA_Phone = userA_Phone;    }    public String getUserB_Phone() {        return userB_Phone;    }    public void setUserB_Phone(String userB_Phone) {        this.userB_Phone = userB_Phone;    }    public String getUserA() {        return userA;    }    public void setUserA(String userA) {        this.userA = userA;    }    public String getUserB() {        return userB;    }    public void setUserB(String userB) {        this.userB = userB;    }    public String getStartTime() {        return startTime;    }    public void setStartTime(String startTime) {        this.startTime = startTime;    }    public String getEndTime() {        return endTime;    }    public void setEndTime(String endTime) {        this.endTime = endTime;    }    public Long getTimeLen() {        return timeLen;    }    public void setTimeLen(Long timeLen) {        this.timeLen = timeLen;    }    public String getUserA_Address() {        return userA_Address;    }    public void setUserA_Address(String userA_Address) {        this.userA_Address = userA_Address;    }    public String getUserB_Address() {        return userB_Address;    }    public void setUserB_Address(String userB_Address) {        this.userB_Address = userB_Address;    }    @Override    public void write(DataOutput out) throws IOException {        out.writeUTF(userA);        out.writeUTF(userB);        out.writeUTF(userA_Phone);        out.writeUTF(userB_Phone);        out.writeUTF(startTime);        out.writeUTF(endTime);        out.writeLong(timeLen);        out.writeUTF(userA_Address);        out.writeUTF(userB_Address);    }    @Override    public void readFields(DataInput in) throws IOException {        userA = in.readUTF();        userB = in.readUTF();        userA_Phone = in.readUTF();        userB_Phone = in.readUTF();        startTime = in.readUTF();        endTime = in.readUTF();        timeLen = in.readLong();        userA_Address = in.readUTF();        userB_Address = in.readUTF();    }    @Override    public String toString() {        return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","                + timeLen + "," + userA_Address + "," + userB_Address;    }     @Override     public int compareTo(PhoneLog pl) {     if(this.hashCode() == pl.hashCode()) {     return 0;     }     return -1;     }}

最后重启hadoop#start-all.sh  完成评测

来源地址:https://blog.csdn.net/qq_61604164/article/details/127868559

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯