文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

spark应用程序如何在Java项目中运行

2023-05-31 02:07

关注

这篇文章将为大家详细讲解有关spark应用程序如何在Java项目中运行,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

如下所示:

package org.shirdrn.spark.job;import java.io.File;import java.io.IOException;import java.util.Arrays;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.regex.Pattern;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.shirdrn.spark.job.maxmind.Country;import org.shirdrn.spark.job.maxmind.LookupService;import scala.Serializable;import scala.Tuple2;public class IPAddressStats implements Serializable {  private static final long serialVersionUID = 8533489548835413763L;  private static final Log LOG = LogFactory.getLog(IPAddressStats.class);  private static final Pattern SPACE = Pattern.compile(" ");  private transient LookupService lookupService;  private transient final String geoIPFile;  public IPAddressStats(String geoIPFile) {   this.geoIPFile = geoIPFile;   try {    // lookupService: get country code from a IP address    File file = new File(this.geoIPFile);    LOG.info("GeoIP file: " + file.getAbsolutePath());    lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);   } catch (IOException e) {    throw new RuntimeException(e);   }  }  @SuppressWarnings("serial")  public void stat(String[] args) {   JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));   JavaRDD<String> lines = ctx.textFile(args[1], 1);   // splits and extracts ip address filed   JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {    @Override    public Iterable<String> call(String s) {     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"     // ip address     return Arrays.asList(SPACE.split(s)[0]);    }   });   // map   JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {    @Override    public Tuple2<String, Integer> call(String s) {     return new Tuple2<String, Integer>(s, 1);    }   });   // reduce   JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {    @Override    public Integer call(Integer i1, Integer i2) {     return i1 + i2;    }   });   List<Tuple2<String, Integer>> output = counts.collect();   // sort statistics result by value   Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {    @Override    public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {     if(t1._2 < t2._2) {       return 1;     } else if(t1._2 > t2._2) {       return -1;     }     return 0;    }   });   writeTo(args, output);  }  private void writeTo(String[] args, List<Tuple2<String, Integer>> output) {   for (Tuple2<&#63;, &#63;> tuple : output) {    Country country = lookupService.getCountry((String) tuple._1);    LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);   }  }  public static void main(String[] args) {   // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat   if (args.length < 3) {    System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>");    System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");    System.exit(1);   }   String geoIPFile = args[2];   IPAddressStats stats = new IPAddressStats(geoIPFile);   stats.stat(args);   System.exit(0);  }}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯