网站首页  汉语字词  英语词汇  考试资料  写作素材  旧版资料

请输入您要查询的考试资料:

 

标题 为 Mahout 增加聚类评估功能
内容
    聚类算法及聚类评估 Silhouette 简介
    聚类算法简介
    聚类(clustering)是属于无监督学习(Unsupervised learning)的一种,用来把一组数据划分为几类,每类中的数据尽可能的相似,而不同类之间尽可能的差异最大化。通过聚类,可以为样本选取提供参考,或进行根源分析,或作为其它算法的预处理步骤。
    聚类算法中,最经典的要属于 Kmeans 算法,它的基本思想是:假设我们要把一组数据聚成 N 类,那就:
    把数据中的每个样本作为一个向量,记作Ā
    首先随机选取 n 个样本,把这 n 个样本作为 N 类的中心点, 称为 centroid
    针对数据中的所有样本,计算到 n 个 centroid 的距离,距离哪个中心点最近,就属于哪一类
    在每一类中,重新选取 centroid,假设该类有 k 个样本,则 centroid 为
    重复 2,3 直到 centroid 的变化小于预设的值。
    Mahout 是一个开源的机器学习软件,提供了应用推荐、聚类、分类、Logistic 回归分析等算法。特别是由于结合了 Hadoop 的大数据处理能力,每个算法都可以作为独立的 job 方便的部署在 Hadoop 平台上,因此得到了越来越广的应用。在聚类领域,Mahout 提供了 Kmeans,LDA, Canopy 等多种算法。
    聚类评估算法 Silhouette 简介
    在 Kmeans 中,我们会注意到需要我们预先设置聚合成几类。实际上,在聚类的过程中我们也不可能预先知道,那只能分成 2 类,3 类,……n 类这样进行尝试,并评估每次的聚类效果。
    实际上,由于聚类的无监督学习特性,无论什么算法都需要评估效果。在聚类的评估中,有基于外部数据的评价,也有单纯的基于聚类本身的评价,其基本思想就是:在同一类中,各个数据点越近越好,并且和类外的数据点越远越好;前者称为内聚因子(cohension),后者称为离散因子(separation)。
    把这两者结合起来,就形成了评价聚类效果的 Silhouette 因子:
    首先看如何评价一个点的聚类效果:
    a = 一个点到同一聚类内其它点的平均距离
    b=min(一个点到其他聚类内的点的平均距离)
    Silhouette 因子s = 1 – a/b (a<b) 或b/a -1 (a>=b)
    衡量整体聚类的效果,则是所有点的 Silhouette 因子的平均值。范围应该在 (-1,1), 值越大则说明聚类效果越好。
    图 1.Silhouette 中内聚、离散因子示意
    以图 1 为例。图 1 显示的是一个具有 9 个点的聚类,三个圆形表示聚成了三类,其中的黄点表示质心(centroid)。为了评估图 1 中深蓝色点的聚类效果,其内聚因子a就是该点到所在圆中其它三个点的平均距离。离散因子b的计算相对复杂:我们需要先求出到该点到右上角圆中的三个点的平均距离,记为 b1;然后求出该点到右下角圆中两个点的平均距离,记为 b2;b1 和 b2 的较小值则为b。
    在 IBM 的 SPSS Clementine 中,也有 Silhouett 评估算法的实现,不过 IBM 提供的是一个简化版本,把一个点到一个类内的距离的平均值,简化为到该类质心(centroid)的距离,具体来说,就是:
    图 2.IBM 关于内聚、离散因子的简化实现
    还是以上面描述的 9 个点聚成 3 类的例子来说明。IBM 的实现把a的实现简化为到深蓝色的点所在的质心的距离。计算b时候,还是要先计算 b1 和 b2,然后求最小值。但 b1 简化为到右上角圆质心的距离;b2 简化为到右下角圆质心的距离。
    在下面的内容中,我们尝试利用 IBM 简化后的公式为 Mahout 增加聚类评估功能。
    Mahout 聚类过程分析
    Mahout 运行环境简介
    前面说过,Mahout 是依赖 Hadoop 环境,每一个算法或辅助功能都是作为 Hadoop 的一个单独的 job 来运行,所以必须准备好一个可运行的 Hadoop 环境,(至少本文写作时候使用的 Mahout0.9 还在依赖 Hadoop),如何安装配置一个可运行的 Hadoop 环境不在这篇文章的介绍范围内。请自行参考 Hadoop 网站。需要说明的是,本文采用的 Hadoop 为 2.2.0。
    安装完 Hadoop 后,下载 mahout-distribution-0.9,解压缩后的重要内容如下:
    bin/: 目录下有 Mahout 可执行脚本
    mahout-examples-0.9-job.jar,各种算法的实现类
    example/ 各种实现算法的源码
    conf/ 存放各实现类的配置文件,其中重要的为 driver.classes.default.props,如果增加实现算法类,可以在该文件中增加配置项,从而可以被 Mahout 启动脚本调用。
    单独执行 Mahout,是一个实现的各种功能的简介,如下例:
    执行 /data01/shanlei/src/mahout-distribution-0.9/bin/mahout
    输出:
    MAHOUT_LOCAL is not set; adding
    HADOOP_CONF_DIR to classpath.
    Running on hadoop, using /data01/shanlei/hadoop-2.2.0/bin/hadoop and
    HADOOP_CONF_DIR=/data01/shanlei/hadoop-2.2.0/conf
    p1 is org.apache.mahout.driver.MahoutDriver
    MAHOUT-JOB:
    /data01/shanlei/src/mahout-distribution-0.9/examples/target/mahout-examples-0.9-job.jar
    An example program must be given as the first argument.
    Valid program names are:
    arff.vector: : Generate Vectors from an ARFF file or directory
    assesser: : assesse cluster result using silhoueter algorithm
    baumwelch: : Baum-Welch algorithm for unsupervised HMM training
    canopy: : Canopy clustering
    cat: : Print a file or resource as the logistic regression models would see it
    cleansvd: : Cleanup and verification of SVD output
    clusterdump: : Dump cluster output to text
    clusterpp: : Groups Clustering Output In Clusters
    cmdump: : Dump confusion matrix in HTML or text formats
    concatmatrices: : Concatenates 2 matrices of same cardinality into a single matrix
    ……
    如果要执行某种算法,如上面结果中显示的 canopy,就需要执行 mahout canopy 加上该算法需要的其它参数。
    另外,Mahout 算法的输入输出,都是在 Hadoop HDFS 上,因此需要通过 hdfs 命令上传到 hdfs 文件系统;输出大多为 Mahout 特有的二进制格式,需要通过 mahout seqdumper 等命令来导出并转换为可读文本。
    准备输入
    Mahout 算法使用的 input 需要特定格式的 Vector 文件,不能够直接使用一般的文本文件,因此需要把文本转换为 Vector 文件,好在 Mahout 自身提供了这样的类:
    org.apache.mahout.clustering.conversion.InputDriver。
    在 Mahout 的 conf 目录中的 driver.classes.default.props 增加如下行:
    org.apache.mahout.clustering.conversion.InputDriver = input2Seq : create sequence file from blank separated files,然后就可以为 Mahout 增加一个功能,把空格分隔的文本文件转换为 Mahout 聚类可以使用的向量。
    如下面的数据所示,该数据每行为一个包含 6 个属性的向量:
    1 4 3 11 4 3
    2 2 5 2 10 3
    1 1 2 2 10 1
    1 4 2 11 5 4
    1 1 3 2 10 1
    2 4 5 9 5 2
    2 6 5 3 8 1
    执行 ./mahout input2Seq -i /shanlei/userEnum -o /shanlei/vectors 则产生聚类需要的向量文件。
    聚类
    以 Kmeans 聚类为例:
    ./mahout kmeans --input /shanlei/vectors --output /shanlei/kmeans -c /shanlei/k --maxIter 5 -k 8 –cl
    -k 8 指明产生 8 类,执行完成后,在/shanlei/kmeans/下会产生: clusters-0,clusters-1,… …,clusters-n-final 目录,每个目录都是一次迭代产生的 centroids, 目录数会受 --maxIter 控制;最后的结果会加上 final。
    利用 Mahout 的 clusterdump 功能我们可以查看聚类的结果:
    ./mahout clusterdump -i /shanlei/kmeans/clusters-2-final -o ./centroids.txt
    more centroids.txt:
    VL-869{n=49 c=[1.163, 5.082, 4.000,
    4.000, 4.592, 2.429] r=[0.370, 0.965, 1.030, 1.245, 1.244, 1.161]}
    VL-949{n=201 c=[1.229, 4.458, 4.403,
    10.040, 6.134, 1.458] r=[0.420, 1.079, 0.836, 1.196, 1.392, 0.852]}
    … …
    VL-980{n=146 c=[1.281, 2.000, 4.178,
    2.158, 9.911, 1.918] r=[0.449, 0.712, 1.203, 0.570, 0.437, 1.208]}
    VL-869 中的 869 为该类的 id,c=[1.163, 5.082, 4.000, 4.000, 4.592, 2.429] 为 centroid 的坐标,n=49 表示该类中数据点的个数。
    如果使用-cl 参数,则在/shanlei/kmeans/下会产生 clusteredPoints,利用 Mahout 的 seqdumper 可以看其内容:
    Input Path:
    hdfs://rac122:18020/shanlei/kmeans/clusteredPoints/part-m-00000
    Key class: class
    org.apache.hadoop.io.IntWritable Value Class: class
    org.apache.mahout.clustering.classify.WeightedPropertyVectorWri
    table
    Key: 301: Value: wt: 1.0 distance:
    6.6834472852629006 vec: 1 = [1.000, 4.000, 3.000, 11.000, 4.000, 3.000]
    Key: 980: Value: wt: 1.0 distance:
    2.3966504034528384 vec: 2 = [2.000, 2.000, 5.000, 2.000, 10.000, 3.000]
    Key 对应的则是相关聚类的 id,distance 为到 centroid 的距离。vec 则是原始的向量。
    从 Mahout 的聚类输出结果来看,能够很容易的实现 IBM 简化后的 Silhouette 算法,内聚因子 (a) 可以简单的获取到,而离散因子 (b) 也能够简单的计算实现。下面我们就来设计 Mahout 中的实现。
    Mahout 中 Silhouette 实现
    算法设计:
    遵循 Hadoop 上 MR 程序的设计原则,算法设计考虑了 mapper,reducer 及 combiner 类。
    Mapper 设计:
    输入目录:聚类的最终结果目录 clusteredPoints(通过命令行参数-i 设置),
    输入:
    Key:IntWritable,Value:WeightedPropertyVectorWritable
    输出:
    Key:IntWritable(无意义,常量 1),Value:Text(单个点的 Silhouette 值,格式为“cnt,Silhouette 值”)
    Setup 过程:
    因为需要计算 separation 时候要访问其它的 centroids,所以在 setup 中读取(通过命令行参数-c 设置)并缓存。
    Map 过程:
    由于输入的 Value 为 WeightedPropertyVectorWritable,可以通过访问字段 distance获得参数 a,并遍历缓存的 centroids,针对其 id 不等于 Key 的,逐一计算距离,其最小的就是参数 b。
    Map 的结果 Key 使用常量 1,Value 为形如“1,0.23”这样的“cnt,Silhouette 值”格式。
    Reducer 设计:
    输入:
    Key:IntWritable(常量 1),Value: Text (combine 后的中间 Silhouette 值,格式为“cnt,Silhouette 值”)。
    输出:
    Key:IntWritable(常量 1),Value:整个聚类的 Silhouette 值,格式为“cnt,Silhouette 值”。
    输出目录:最终文件的产生目录,通过命令行参数-o 设置。
    Reduce 过程:
    根据“,”把每个 Value,分解为 cnt,和 Silhouette,最后进行加权平均。
    Combiner 设计:
    为减少数据的 copy,采用 combiner,其实现即为 reducer 的实现。
    实现代码:
    Mapper 类:
    public class AssesserMapper extends Mapper<IntWritable,
    WeightedPropertyVectorWritable, IntWritable, Text> {
    private List<Cluster> clusterModels;
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    Configuration conf = context.getConfiguration();
    String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
    clusterModels = Lists.newArrayList();
    if (clustersIn != null && !clustersIn.isEmpty()) {
    Path clustersInPath = new Path(clustersIn);
    clusterModels = populateClusterModels(clustersInPath, conf);
    }
    }
    private static List<Cluster> populateClusterModels(Path clustersIn,
    Configuration conf) throws IOException {
    List<Cluster> clusterModels = Lists.newArrayList();
    Path finalClustersPath = finalClustersPath(conf, clustersIn);
    Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
    PathFilters.partFilter(), null, false, conf);
    while (it.hasNext()) {
    ClusterWritable next = (ClusterWritable) it.next();
    Cluster cluster = next.getValue();
    cluster.configure(conf);
    clusterModels.add(cluster);
    }
    return clusterModels;
    }
    private static Path finalClustersPath(Configuration conf,
    Path clusterOutputPath) throws IOException {
    FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
    FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
    PathFilters.finalPartFilter());
    log.info("files: {}", clusterOutputPath.toString());
    return clusterFiles[0].getPath();
    }
    protected void map(IntWritable key, WeightedPropertyVectorWritable vw, Context context)
    throws IOException, InterruptedException {
    int clusterId=key.get();
    double cohension,separation=-1,silhouete;
    Map<Text,Text> props=vw.getProperties();
    cohension=Float.valueOf(props.get(new Text("distance")).toString());
    Vector vector = vw.getVector();
    for ( Cluster centroid : clusterModels) {
    if (centroid.getId()!=clusterId) {
    DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) centroid;
    DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
    double f = distanceMeasure.distance(centroid.getCenter(), vector);
    if (f<separation || separation<-0.5) separation=f;
    }
    }
    Text value=new Text(Long.toString(1)+","+Double.toString(silhouete));
    IntWritable okey=new IntWritable();
    okey.set(1);
    context.write(okey, value);
    }
    }
    Reducer 类:
    public class AssesserReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    log.info("reducer");
    }
    private static final Pattern SEPARATOR = Pattern.compile("[t,]");
    public void reduce(IntWritable key, Iterable<Text> values,
    Context context)
    throws IOException, InterruptedException {
    long cnt=0;
    double total=0;
    for (Text value : values) {
    String[] p=SEPARATOR.split(value.toString());
    Long itemCnt=Long.parseLong(p[0]);
    double v=Double.parseDouble(p[1]);
    total=total+ itemCnt*v;
    cnt=cnt+itemCnt;
    }
    }
    }
    Job 类:
    public class ClusterAssesser extends AbstractJob {
    private ClusterAssesser() {
    }
    public int run(String[] args) throws Exception {
    addInputOption();
    addOutputOption();
    //addOption(DefaultOptionCreator.methodOption().create());
    addOption(DefaultOptionCreator.clustersInOption()
    .withDescription("The input centroids").create());
    if (parseArguments(args) == null) {
    return -1;
    }
    Path input = getInputPath();
    Path output = getOutputPath();
    Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
    if (getConf() == null) {
    setConf(new Configuration());
    }
    run(getConf(), input, clustersIn, output);
    return 0;
    }
    private void run(Configuration conf, Path input, Path clustersIn,
    Path output)throws IOException, InterruptedException,
    ClassNotFoundException {
    conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
    Job job = new Job(conf, "Cluster Assesser using silhouete over input: " + input);
    job.setJarByClass(ClusterAssesser.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(AssesserMapper.class);
    job.setCombinerClass(AssesserReducer.class);
    job.setReducerClass(AssesserReducer.class);
    job.setNumReduceTasks(1);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);
    if (!job.waitForCompletion(true)) {
    throw new InterruptedException("Cluster Assesser Job failed processing " + input);
    }
    }
    private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new ClusterAssesser(), args);
    }
    }
    编译运行:
    编译环境准备:
    在从 Mahout 网站下载的包中,同时包含了源码以及可以导入到 eclipse 的工程,导入后,会产生 mahout-core,mahout-distribution,mahout-example 等不同的 projects,我们首先编译一遍,保证没有错误,然后再考虑如何增加自己的代码。
    当然,Mahout 在顶层目录也提供了一个编译脚本:compile.sh, 可以在命令行完成编译。
    代码编译:
    把自己的代码放到 example/src/main/java/目录下,自动编译就可以了。输出产生的类:com.ai.cluster.assesser.ClusterAssesser,然后就被打包到了 examples/target/mahout-examples-0.9-job.jar 中。
    配置:
    把 examples/target/mahout-examples-0.9-job.jar 覆盖顶层的 mahout-examples-0.9-job.jar
    通过在 conf/driver.classes.default.props 文件添加如下行,把我们的实现类加入到 Mahout 的配置中,从而可以通过 Mahout 脚本执行:
    com.ai.cluster.assesser.ClusterAssesser = assesser : assesse cluster result using silhoueter algorithm
    运行
    利用前面我们做聚类过程分析产生的聚类结果:
    bin/mahout assesser -i /shanlei/kmeans/clusteredPoints -o /shanlei/silhouete -c
    /shanlei/kmeans --tempDir /shanlei/temp
    其中的-c 为输入聚类的中心点,-i 为聚类的点 –o 为最终的输出。
    查看结果:
    bin/mahout seqdumper -i /shanlei/silhouete -o ./a.txt
    more a.txt:
    Input Path: hdfs://rac122:18020/shanlei/silhouete/part-r-00000
    Key class: class org.apache.hadoop.io.IntWritable Value Class: class org.apache.hadoop.io.Text
    Key: 1: Value: 1000,0.5217678842906524
    Count: 1
    1000 表示共 1000 个点,0.52176 为聚类的 Silhouette 值。大于 0.5,看起来效果还行。
    结束语:
    不同于其它的套件,Mahout 从发布起就是为处理海量数据、为生产而准备的。直到现在,Mahout 的重心还是在优化各种算法上面,对易用性考虑不多,而且学习成本也很高。但 Mahout 不仅仅提供某些特定的算法,而且还把前期准备中的数据清洗,转换,以及后续的效果评估、图形化展现都集成在一块,方便用户。这不仅是一种发展趋势,也是争取用户的一个关键因素。希望大家都能够加入进来,提供各种各样的辅助功能,让 Mahout 变得易用起来。
随便看

 

在线学习网考试资料包含高考、自考、专升本考试、人事考试、公务员考试、大学生村官考试、特岗教师招聘考试、事业单位招聘考试、企业人才招聘、银行招聘、教师招聘、农村信用社招聘、各类资格证书考试等各类考试资料。

 

Copyright © 2002-2024 cuapp.net All Rights Reserved
更新时间:2025/5/13 3:23:18