-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.html
726 lines (678 loc) · 40.4 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
<!DOCTYPE html>
<!--[if IEMobile 7 ]><html class="no-js iem7"><![endif]-->
<!--[if lt IE 9]><html class="no-js lte-ie8"><![endif]-->
<!--[if (gt IE 8)|(gt IEMobile 7)|!(IEMobile)|!(IE)]><!--><html class="no-js" lang="zh"><!--<![endif]-->
<head>
<meta charset="utf-8">
<title>kangfoo's blog</title>
<meta name="author" content="kangfoo">
<meta name="description" content="好记性不如乱笔头!">
<meta name="HandheldFriendly" content="True">
<meta name="MobileOptimized" content="320">
<meta name="OpooPressSiteRoot" content="">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="Generator" content="OpooPress-1.0.3"/>
<meta name="Generated" content="2015-03-03T22:57:35+08:00"/>
<link rel="canonical" href="/">
<link href="/page/2/" rel="next" />
<link href="/favicon.ico" rel="icon">
<link href="/atom.xml" rel="alternate" title="kangfoo's blog" type="application/atom+xml">
<link href="/stylesheets/screen.css" media="screen, projection" rel="stylesheet" type="text/css">
<!--Fonts from Google"s Web font directory at http://google.com/webfonts -->
<link href="http://dn-opstatic.qbox.me/themes/default/stylesheets/fonts.css" rel="stylesheet" type="text/css">
<!--
<link href="//fonts.googleapis.com/css?family=PT+Serif:regular,italic,bold,bolditalic|PT+Sans:regular,italic,bold,bolditalic" rel="stylesheet" type="text/css">
国内网站:http://dn-opstatic.qbox.me/themes/default/stylesheets/fonts.css
国际网站:http://static.opoo.org/themes/default/stylesheets/fonts.css
-->
<link type="text/css" rel="stylesheet" href="/plugins/syntax-highlighter/styles/shCoreDefault.css"/>
<!--[if lt IE 9]><script src="/javascripts/html5shiv.js"></script><![endif]-->
</head>
<body>
<!--[if lt IE 9]><script src="/javascripts/unsupported-browser.js"></script><![endif]-->
<header role="banner"><hgroup>
<h1><a href="/">kangfoo's blog</a></h1>
<h2>工作学习笔记,生活掠影。</h2>
</hgroup>
</header>
<nav role="navigation"><ul class="subscription" data-subscription="rss">
<li><a href="/atom.xml" rel="subscribe-rss" title="subscribe via RSS">RSS</a></li>
</ul>
<form action="http://google.com/search" method="get">
<fieldset role="search">
<input type="hidden" name="q" value="site:http://kangfoo.u.qiniudn.com/" />
<input class="search" type="text" name="q" results="0" placeholder="搜索"/>
</fieldset>
</form>
<fieldset class="mobile-nav">
<select onchange="if (this.value) { window.location.href = this.value;}">
<option value="">导航…</option>
<option value="/" selected="selected">» 首页</option>
<option value="/category/hadoop/">» hadoop</option>
<option value="/category/java/">» java</option>
<option value="/archives/">» 归档</option>
<option value="http://www.opoopress.com/">» OpooPress</option>
<option value="/about/">» 关于</option>
</select>
</fieldset>
<ul class="main-navigation">
<li><a href="/">首页</a></li>
<li><a href="/category/hadoop/">hadoop</a></li>
<li><a href="/category/java/">java</a></li>
<li><a href="/archives/">归档</a></li>
<li><a href="http://www.opoopress.com/" target="_blank">OpooPress</a></li>
<li><a href="/about/">关于</a></li>
</ul>
</nav>
<div id="main">
<div id="content">
<div class="blog-index">
<article>
<header>
<h1 class="entry-title"><a href="/article/2014/04/spring-batch--ru-men/">Springbatch入门</a></h1>
<p class="meta">
<time datetime="2014-04-08T22:23:00+08:00" pubdate>2014年04月08日</time>
| <a href="/article/2014/04/spring-batch--ru-men/#disqus_thread">评论</a>
</p>
</header>
<div class="entry-content"><ol>
<li><p>批处理
在企业级应用系统当中,面对日益复杂的义务及一定规模的数据量,频繁的人机操作会引入一定的时间成本和管理风险。可以采取定时读取大批数据,在执行相应的工作流程,并归档。我首先想到的就是直接使用批处理进行解决。以解决我可能要面对的与特定时间周期相关、数据量大、尽量少人工干涉、自动完成、事后督察等工作。这些工作可以用 存储过程 + shell 等方式实现,但作为应用程序而言,我倾向于使用JAVA api. jdk 7 才有原生的API支持(这是 <a href="http://docs.oracle.com/javaee/7/tutorial/doc/batch-processing.htm">JAVA 7 Batch Processing Tutorial</a>)。貌似6要费费劲。我一直是spring 的粉丝。SpringBatch 自然会上场的。</p>
</li>
<li><p>spring batch
一般的批处理都分有三个阶段
读数据(我的数据目前大部分来自于文件)
处理数据(业务逻辑)
写数据(将业务结果写入数据库)
这些过程又必选考虑效率、事物的粒度、监控、资源开销。读和写、业务处理一般都是独立的模块可直接解耦。谷歌了一番,看中了 spring batch。</p>
</li>
</ol>
<p>那么 spring batch 可以给我们带来什么好处?
Spring Batch作为Spring的一个顶级子项目,是一款优秀的大数据量并行处理框架。通过Spring Batch可以构建出轻量级的健壮的并行处理应用,支持事务、并发、监控,提供统一的接口管理和任务管理。</p>
<p>谷歌文档一堆呀。先列举下我认为不错的。</p>
<ul>
<li><a href="http://projects.spring.io/spring-batch/#quick-start">spring-batch-quick-start</a></li>
<li><a href="http://docs.spring.io/spring-batch/">spring-batch-docs</a></li>
<li><a href="http://blog.csdn.net/shorn/article/details/7744579">2.1.8 中文版的翻译</a></li>
<li><a href="http://wenku.baidu.com/view/9134505a0b1c59eef8c7b456">基于Spring Batch的大数据量并行处理</a></li>
<li><a href="http://www.ibm.com/developerworks/cn/java/j-lo-springbatch1/">使用 Spring Batch 构建企业级批处理应用: 第 1 部分</a></li>
<li><a href="http://spring.io/guides/gs/batch-processing/">spring 官方教程示例</a></li>
<li><a href="https://github.com/chrisjs/maven-springbatch-archetype">maven-springbatch-archetype</a> maven archetype 插件默认使用的是 springbatch 2.2.5 稳定版的。其默认生成的示例和<a href="http://spring.io/guides/gs/batch-processing/"> spring batch 官方文档</a>上提供的数据完全一致,官方介绍。</li>
</ul>
<p>好吧工具算是找的差不多了。
也要开始我的第一个 demo 了。具体的概念先放放。东西弄出来了,在慢慢细嚼。</p>
<ol>
<li><p>首先 git clone <a href="https://github.com/chrisjs/maven-springbatch-archetype">maven-springbatch-archetype</a> maven 插件。同时请确保你自己的版本,我当前使用的 1.4-SNAPSHOT ,按照 readme 一步步执行吧。</p>
</li>
<li><p>生成我们自己的样板工程代码结构</p>
<pre class='brush:shell'>mvn archetype:generate \
-DarchetypeGroupId=com.dtzq \
-DarchetypeArtifactId=maven-springbatch-archetype \
-DarchetypeVersion=1.4-SNAPSHOT \
-DgroupId=com.kangfoo.study.hygeia \
-DartifactId=springbatch.test \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.kangfoo.study.hygeia.springbatch.test
</pre></li>
<li><p>题外话。第一次在 github 向他人维护的项目提交代码,弄了会儿,玩转了。
主要借鉴<a href="http://site.douban.com/196781/widget/notes/12161495/note/269163206/">花20分钟写的-大白话讲解如何给github上项目贡献代码</a>
先记录在案。</p>
</li>
<li><p><a href="http://www.mkyong.com/spring-batch/spring-batch-and-quartz-scheduler-example/">spring batch + quartz</a></p>
</li>
</ol>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/article/2014/03/hadoop-pipes--streaming/">Hadoop Pipes & Streaming</a></h1>
<p class="meta">
<time datetime="2014-03-03T22:26:00+08:00" pubdate>2014年03月03日</time>
| <a href="/article/2014/03/hadoop-pipes--streaming/#disqus_thread">评论</a>
</p>
</header>
<div class="entry-content"><p>申明:本文大部分出自于 <a href="http://new.osforce.cn/?mu=20140227220525KZol8ENMYdFQ6SjMveU26nEZ">开源力量</a> LouisT 老师的<a href="http://new.osforce.cn/course/101?mc101=20140301233857au7XG16o9ukfev1pmFCOfv2s">开源力量培训课-Hadoop Development</a>课件 和 Apache 官方文档。</p>
<h2>Streaming</h2>
<ul>
<li>Streaming 是 hadoop 里面提供的一个工具</li>
<li>Streaming 框架允许任何程序语言实现的程序在 Hadoop MapReduce 中使用,方便任何程序向 Hadoop 平台移植,具有很强的扩展性;</li>
<li>mapper 和 reducer 会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming 工具会创建 MapReduce 作业,发送给各个 tasktracker,同时监控整个作业的执行过程;</li>
<li>如果一个文件(可执行或者脚本)作为 mapper,mapper 初始化时,每一个 mapper 任务会把该文件作为一个单独进程启动,mapper 任务运行时,它把输入切法成行并把每一行提供给可执行文件进程的标准输入。同 时,mapper 收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成 key/value,作为 mapper的输出。默认情况下,一行中第一个 tab 之前的部分作为 key,之后的(不包括)作为value。如果没有 tab,整行作为 key 值,value值为null。对于reducer,类似;</li>
</ul>
<h3>Streaming 优点</h3>
<ol>
<li><p>开发效率高,便于移植。Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口。在单机上可按照 cat input | mapper | sort | reducer > output 进行测试,若单机上测试通过,集群上一般控制好内存也可以很好的执行成功。</p>
</li>
<li><p>提高运行效率。对内存要求较高,可用C/C++控制内存。比纯java实现更好。</p>
</li>
</ol>
<h3>Streaming缺点</h3>
<ol>
<li><p>Hadoop Streaming 默认只能处理文本数据,(0.21.0之后可以处理二进制数据)。</p>
</li>
<li><p>Steaming 中的 mapper 和 reducer 默认只能想标准输出写数据,不能方便的多路输出。</p>
</li>
</ol>
<p>更详细内容请参考于: http://hadoop.apache.org/docs/r1.2.1/streaming.html</p>
<pre class='brush:shell'>$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
</pre><h3>streaming示例</h3>
<p>perl 语言的<a href="https://github.com/kangfoo/hadoop1.study/tree/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/streaming">streaming示例</a> 代码</p>
<pre class='brush:perl'>-rw-rw-r--. 1 hadoop hadoop 48 2月 22 10:47 data
-rw-rw-r--. 1 hadoop hadoop 107399 2月 22 10:41 hadoop-streaming-1.2.1.jar
-rw-rw-r--. 1 hadoop hadoop 186 2月 22 10:45 mapper.pl
-rw-rw-r--. 1 hadoop hadoop 297 2月 22 10:55 reducer.pl
##
$ ../bin/hadoop jar hadoop-streaming-1.2.1.jar -mapper mapper.pl -reducer reducer.pl -input /test/streaming -output /test/streamingout1 -file mapper.pl -file reducer.pl
</pre><h2>Hadoop pipes</h2>
<ol>
<li>Hadoop pipes 是 Hadoop MapReduce 的 C++ 的接口代称。不同于使用标准输入和输出来实现 map 代码和 reduce 代码之间的 Streaming。</li>
<li>Pipes 使用套接字 socket 作为 tasktracker 与 C++ 版本函数的进程间的通讯,未使用 JNI。</li>
<li>与 Streaming 不同,Pipes 是 Socket 通讯,Streaming 是标准输入输出。</li>
</ol>
<h3>编译 Hadoop Pipes</h3>
<p>编译c++ pipes( 确保操作系统提前安装好了 openssl,zlib,glib,openssl-devel)
Hadoop更目录下执行
ant -Dcompile.c++=yes examples</p>
<p>具体请参见《Hadoop Pipes 编译》</p>
<h3>Hadoop官方示例:</h3>
<pre class='brush:shell'>hadoop/src/examples/pipes/impl
config.h.in
sort.cc
wordcount-nopipe.cc
wordcount-part.cc
wordcount-simple.cc
</pre><p>运行前需要把可执行文件和输入数据上传到 hdfs:</p>
<pre class='brush:shell'>$ ./bin/hadoop fs -mkdir /test/pipes/input
$ ./bin/hadoop fs -put a.txt /test/pipes/input
$ ./bin/hadoop fs -cat /test/pipes/input/a.txt
hello hadoop hello hive hello hbase hello zk
</pre><p>上传执行文件,重新命名为/test/pipes/exec</p>
<pre class='brush:shell'>$ ./bin/hadoop fs -put ./build/c++-examples/Linux-amd64-64/bin/wordcount-simple /test/pipes/exec
</pre><p>在编译好的文件夹目录下执行</p>
<pre>$ cd hadoop/build/c++-examples/Linux-amd64-64/bin
$ ../../../../bin/hadoop pipes -Dhadoop.pipes.java.recordreader=true -Dhadoop.pipes.java.recordwriter=true -reduces 4 -input /test/pipes/input -output /test/pipes/input/output1 -program /test/pipes/execs
</pre><p>执行结果如下:</p>
<pre>$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00000 hbase 1
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00001 hello 4 hive 1
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00002 hadoop 1 zk 1
$ ./bin/hadoop fs -cat /test/pipes/input/output1/part-00003
</pre><h3>参考博客:</h3>
<ul>
<li><a href="http://dongxicheng.org/mapreduce/hadoop-pipes-programming/">Hadoop pipes编程</a></li>
<li><a href="http://hongweiyi.com/2012/05/hadoop-pipes-src/">Hadoop Pipes运行机制</a></li>
</ul>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/article/2014/03/hadoop-mapreduce-sort/">Hadoop MapReduce Sort</a></h1>
<p class="meta">
<time datetime="2014-03-03T22:24:00+08:00" pubdate>2014年03月03日</time>
| <a href="/article/2014/03/hadoop-mapreduce-sort/#disqus_thread">评论</a>
</p>
</header>
<div class="entry-content"><p>排序是 MapReduce 的核心。排序可分为四种排序:普通排序、部分排序、全局排序、辅助排序</p>
<h2>普通排序</h2>
<p>Mapreduce 本身自带排序功能;Text 对象是不适合排序的;IntWritable,LongWritable 等实现了WritableComparable 类型的对象都是可以排序的。</p>
<h2>部分排序</h2>
<p>map 和 reduce 处理过程中包含了默认对 key 的排序,那么如果不要求全排序,可以直接把结果输出,每个输出文件中包含的就是按照key执行排序的结果。</p>
<h3>控制排序顺序</h3>
<p>键的排序是由 RawComparator 控制的,规则如下:</p>
<ol>
<li>若属性 mapred.output.key.comparator.class 已设置,则使用该类的实例。调用 JobConf 的 setOutputKeyComparatorClass() 方法进行设置。</li>
<li>否则,键必须是 WritableComparable 的子类,并使用针对该键类的已登记的 comparator.</li>
<li>如果没有已登记的 comparator ,则使用 RawComparator 将字节流反序列化为一个对象,再由 WritableComparable 的 compareTo() 方法进行操作。</li>
</ol>
<h2>全局排序(对所有数据排序)</h2>
<p>Hadoop 没有提供全局数据排序,而全局排序是非常普遍的需求。</p>
<h3>实现方案</h3>
<ul>
<li>首先,创建一系列的排好序的文件;</li>
<li>其次,串联这些文件;</li>
<li>最后,生成一个全局排序的文件。</li>
</ul>
<p>主要思路是使用一个partitioner来描述全局排序的输出。该方法关键在于如何划分各个分区。</p>
<p>例,对整数排序,[0,10000] 的在 partition 0 中,(10000,20000] 在 partition 1 中… …即第n个reduce 所分配到的数据全部大于第 n-1 个 reduce 中的数据。每个 reduce 的结果都是有序的。</br>
然后再将所有的输出文件顺序合并成一个大的文件,那么就实现了全局排序。</p>
<p>在比较理想的数据分布均匀的情况下,每个分区内的数据量要基本相同。</p>
<p>但实际中数据往往分布不均匀,出现数据倾斜,这时按照此方法进行的分区划分数据就不适用,可对数据进行采样。</p>
<h3>采样器</h3>
<p>通过对 key 空间进行采样,可以较为均匀的划分数据集。采样的核心思想是只查看一小部分键,获取键的相似分布,并由此构建分区。采样器是在 map 阶段之前进行的, 在提交 job 的 client 端完成的。</p>
<h4>Sampler接口</h4>
<p>Sampler 接口是 Hadoop 的采样器,它的 getSample() 方法返回一组样本。此接口一般不由客户端调用,而是由 InputSampler 类的静态方法 writePartitionFile() 调用,以创建一个顺序文件来存储定义分区的键。</p>
<p>Sampler接口声明如下:</p>
<pre class='brush:java'> public interface Sampler<K,V> {
K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
}
</pre><p>继承 Sample 的类还有 IntervalSampler 间隔采样器,RandomSampler 随机采样器,SplitSampler 分词采样器。它们都是 InputSampler 的静态内部类。</p>
<p>getSample() 方法根据 job 的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。</p>
<p><strong>IntervalSampler 根据一定的间隔从 s 个分区中采样数据,非常适合对排好序的数据采样。</strong></p>
<pre class='brush:java'>public static class IntervalSampler<K,V> implements Sampler<K,V> {
private final double freq;// 哪一条记录被选中的概率
private final int maxSplitsSampled;// 采样的最大分区数
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());// 1. 得到输入分区数组
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample; // 2. 分区采样时的间隔splitStep = 输入分区总数 除以 splitsToSample的 商;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], // 3. 采样下标为i * splitStep的数据
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {// 6. 循环读取下一条记录
++records;
if ((double) kept / records < freq) { // 4. 如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合
++kept;
samples.add(key);// 5. 将记录添加到样本集合中
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
… …
}
</pre><p><strong>RandomSampler 是常用的采样器,它随机地从输入数据中抽取 Key</strong>。</p>
<pre class='brush:java'> public static class RandomSampler<K,V> implements Sampler<K,V> {
private double freq;// 一个Key被选中的 概率
private final int numSamples;// 从所有被选中的分区中获得的总共的样本数目
private final int maxSplitsSampled;// 需要检查扫描的最大分区数目
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());// 1. 获取所有的输入分区
ArrayList<K> samples = new ArrayList<K>(numSamples);// 2. 确定需要抽样扫描的分区数目
int splitsToSample = Math.min(maxSplitsSampled, splits.length);// 3. 取最小的为采样的分区数
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits 4. 对输入分区数组shuffle排序
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);// 5. 打乱其原始顺序
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
// 5. 然后循环逐 个扫描每个分区中的记录进行采样,
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
// 6. 取出一条记录
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {// 7. 判断当前的采样数是否小于最大采样数
samples.add(key); //8. 小于则这条记录被选中,放进采样集合中,
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);// 9. 从[0,numSamples]中选择一个随机数
if (ind != numSamples) {
samples.set(ind, key);// 10. 替换掉采样集合随机数对应位置的记录,
}
freq *= (numSamples - 1) / (double) numSamples;// 11. 调小频率
}
key = reader.createKey();// 12. 下一条纪录的key
}
}
reader.close();
}
return (K[])samples.toArray();// 13. 返回
}
}
… …
}
</pre><p><strong>SplitSampler 从 s 个分区中采样前 n 个记录,是采样随机数据的一种简便方式。</strong></p>
<pre class='brush:java'> public static class SplitSampler<K,V> implements Sampler<K,V> {
private final int numSamples;// 最大采样数
private final int maxSplitsSampled;// 最大分区数
… …
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);// 1. 采样的分区数
int splitStep = splits.length / splitsToSample; // 2. 分区采样时的间隔 = 分片的长度 与 输入分片的总数的 商
int samplesPerSplit = numSamples / splitsToSample; // 3. 每个分区的采样数
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep], // 4.采样下标为i * splitStep的数据
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);// 5. 将记录添加到样本集合中
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) { // 6. 当前样本数大于当前的采样分区所需要的样本数,则停止对当前分区的采样。
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
</pre><p><strong>Hadoop为顺序文件提供了一个 TotalOrderPartitioner 类,可以用来实现全局排序</strong>;TotalOrderPartitioner 源代码理解。TotalOrderPartitioner 内部定义了多个字典树(内部类)。</p>
<pre class='brush:java'>interface Node<T>
// 特里树,利用字符串的公共前缀来节约存储空间,最大限度地减少无谓的字符串比较,查询效率比哈希表高
static abstract class TrieNode implements Node<BinaryComparable>
static class InnerTrieNode extends TrieNode
static class LeafTrieNode extends TrieNode
… …
</pre><p>由 TotalOrderPartitioner 调用 getPartition() 方法返回分区,由 buildTrieRec() 构建特里树.</p>
<pre class='brush:java'> private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
… …
}
</pre><h4>采样器使用示例</h4>
<ol>
<li>新建文件,名为 random.txt,里面每行存放一个数据。可由 RandomGenerator 类生成准备数据</li>
<li>执行 TestTotalOrderPartitioner.java</li>
</ol>
<h2>辅助排序</h2>
<p>先按 key 排序,在按 相同的 key 不同的 value 再排序。可实现对值分组的效果。</p>
<ul>
<li>可参考博客 <a href="http://heipark.iteye.com/blog/1990237">Hadoop二次排序关键点和出现时机(也叫辅助排序、Secondary Sort)</a></li>
<li>或者 hadoop example 工程下参考 SecondarySort.java</li>
</ul>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/article/2014/03/hadoop-mapreduce-join/">Hadoop MapReduce Join</a></h1>
<p class="meta">
<time datetime="2014-03-03T22:23:00+08:00" pubdate>2014年03月03日</time>
| <a href="/article/2014/03/hadoop-mapreduce-join/#disqus_thread">评论</a>
</p>
</header>
<div class="entry-content"><p>在 Hadoop 中可以通过 MapReduce,Pig,hive,Cascading编程进行大型数据集间的连接操作。连接操作如果由 Mapper 执行,则称为“map端连接”;如果由 Reduce 执行,则称为“Reduce端连接”。</p>
<p>连接操作的具体实现技术取决于数据集的规模以及分区方式。</br>
若一个数据集很大而另一个数据集很小,以至于可以分发到集群中的每一个节点之中,则可以执行一个 MapReduce 作业,将各个数据集的数据放到一起,从而实现连接。</br>
若两个数据规模均很大,没有哪个数据集可以完全复制到集群的每个节点,可以使用 MapReduce 作业进行连接,使用 Map 端连接还是 Reduce 端连接取决于数据的组织方式。</br></p>
<p>Map端连接将所有的工作在 map 中操作,效率高但是不通用。而 Reduce 端连接利用了 shuff 机制,进行连接,效率不高。</p>
<p>DistributedCache 能够在任务运行过程中及时地将文件和存档复制到任务节点进行本地缓存以供使用。各个文件通常只复制到一个节点一次。可用 api 或者命令行在需要的时候将本地文件添加到 hdfs 文件系统中。</p>
<p>本文中的示例 <strong>出自于 <a href="http://new.osforce.cn/?mu=20140227220525KZol8ENMYdFQ6SjMveU26nEZ">开源力量</a> LouisT 老师的<a href="http://new.osforce.cn/course/101?mc101=20140301233857au7XG16o9ukfev1pmFCOfv2s">开源力量培训课-Hadoop Development</a>课件。</strong></p>
<h3>Map端连接</h3>
<p>Map 端联接是指数据到达 map 处理函数之前进行合并的。它要求 map 的输入数据必须先分区并以特定的方式排序。各个输入数据集被划分成相同数量的分区,并均按相同的键排序(连接键)。同一键的所有输入纪录均会放在同一个分区。以满足 MapReduce 作业的输出。</p>
<p>若作业的 Reduce 数量相同、键相同、输入文件是不可切分的,那么 map 端连接操作可以连接多个作业的输出。</p>
<p>在 Map 端连接效率比 Reduce 端连接效率高(Reduce端Shuff耗时),但是要求比较苛刻。</p>
<h4>基本思路</h4>
<ol>
<li>将需要 join 的两个文件,一个存储在 HDFS 中,一个使用 DistributedCache.addCacheFile() 将需要 join 另一个文件加入到所有 Map 的缓存里(DistributedCache.addCacheFile() 需要在作业提交前设置);</li>
<li>在 Map 函数里读取该文件,进行 Join;</li>
<li>将结果输出到 reduce 端;</li>
</ol>
<h4>使用步骤</h4>
<ol>
<li>在 HDFS 中上传文件(文本文件、压缩文件、jar包等);</li>
<li>调用相关API添加文件信息;</li>
<li>task运行前直接调用文件读写API获取文件;</li>
</ol>
<h3>Reduce端Join</h3>
<p>reduce 端联接比 map 端联接更普遍,因为输入的数据不需要特定的结构;效率低(所有数据必须经过shuffle过程)。</p>
<h4>基本思路</h4>
<ol>
<li>Map 端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;</li>
<li>在 reduce 处理函数里,对按照标识对数据进行保存;</li>
<li>然后根据 Key 的 Join 来求出结果直接输出;</li>
</ol>
<h3>示例程序</h3>
<p>使用 MapReduce map 端join 或者 reduce 端 join 实现如下两张表 emp, dep 中的 SQL 联合查询的数据效果。</p>
<pre class='brush:text'>Table EMP:(新建文件EMP,第一行属性名不要)
----------------------------------------
Name Sex Age DepNo
zhang male 20 1
li female 25 2
wang female 30 3
zhou male 35 2
----------------------------------------
Table Dep:(新建文件DEP,第一行属性名不要)
DepNo DepName
1 Sales
2 Dev
3 Mgt
------------------------------------------------------------
SQL:
select name,sex ,age, depName from emp inner join DEP on EMP.DepNo = Dep.DepNo
----------------------------------------
实现效果:
$ ./bin/hadoop fs -cat /reduceSideJoin/output11/part-r-00000
zhang male 20 sales
li female 25 dev
wang female 30 dev
zhou male 35 dev
</pre><p>Map 端 Join 的例子:<a href="https://github.com/kangfoo/hadoop1.study/blob/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/mp/join/TestMapSideJoin.java">TestMapSideJoin</a> </br>
Reduce 端 Join 的例子:<a href="https://github.com/kangfoo/hadoop1.study/blob/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/mp/join/TestReduceSideJoin.java">TestReduceSideJoin</a> </br></p>
</div>
</article>
<article>
<header>
<h1 class="entry-title"><a href="/article/2014/03/hadoop-mapreduce--ji-shu-qi/">Hadoop MapReduce 计数器</a></h1>
<p class="meta">
<time datetime="2014-03-03T22:22:00+08:00" pubdate>2014年03月03日</time>
| <a href="/article/2014/03/hadoop-mapreduce--ji-shu-qi/#disqus_thread">评论</a>
</p>
</header>
<div class="entry-content"><p>计数器是一种收集系统信息有效手段,用于质量控制或应用级统计。可辅助诊断系统故障。计数器可以比日志更方便的统计事件发生次数。</p>
<h3>内置计数器</h3>
<p>Hadoop 为每个作业维护若干内置计数器,主要用来记录作业的执行情况。</p>
<h4>内置计数器包括</h4>
<ul>
<li>MapReduce 框架计数器(Map-Reduce Framework)</li>
<li>文件系统计数器(FielSystemCounters)</li>
<li>作业计数器(Job Counters)</li>
<li>文件输入格式计数器(File Output Format Counters)</li>
<li>文件输出格式计数器(File Input Format Counters)</li>
</ul>
<!--表格数据太多,暂缓。 TODO-->
<!--<div style="height:0px;border-bottom:1px dashed red"></div>
<table width="100%" border="1" cellpadding="3" cellspacing="0" bordercolor="#eeeeee">
<tbody>
<tr>
<td><em>组别 </em></td>
<td><em>计数器名称 </em></td>
<td><em>说明 </em></td>
</tr>
<tr>
<th rowspan=13>Map-Reduce <br>Framework </th>
<td>Map input records </td>
<td>作业中所有的 map 已处理的输入纪录数。每次 RecordReader 读到一条纪录并将其传递给 map 的 map() 函数时,此计数器的值增加 </td>
</tr>
<tr>
<td>Map skipped records </td>
<td>作业中所有 map 跳过的输入纪录数。 </td>
</tr>
</tbody>
</table>
-->
<p>计数器由其关联的 task 进行维护,定期传递给 tasktracker,再由 tasktracker 传给 jobtracker。因此,计数器能够被全局地聚集。内置计数器实际由 jobtracker 维护,不必在整个网络发送。</p>
<p>一个任务的计数器值每次都是完整传输的,仅当一个作业执行成功之后,计数器的值才完整可靠的。</p>
<h3>自定义Java计数器</h3>
<p>MapReduce 允许用户自定义计数器,MapReduce 框架将跨所有 map 和 reduce 聚集这些计数器,并在作业结束的时候产生一个最终的结果。</p>
<p>计数器的值可以在 mapper 或者 reducer 中添加。多个计数器可以由一个 java 枚举类型来定义,以便对计数器分组。一个作业可以定义的枚举类型数量不限,个个枚举类型所包含的数量也不限。</p>
<p>枚举类型的名称即为组的名称,枚举类型的字段即为计数器名称。</p>
<p>在 TaskInputOutputContext 中的 counter</p>
<pre class='brush:java'> public Counter getCounter(Enum<?> counterName) {
return reporter.getCounter(counterName);
}
public Counter getCounter(String groupName, String counterName) {
return reporter.getCounter(groupName, counterName);
}
</pre><h4>计数器递增</h4>
<p>org.apache.hadoop.mapreduce.Counter类</p>
<pre class='brush:java'> public synchronized void increment(long incr) {
value += incr;
}
</pre><h4>计数器使用</h4>
<ul>
<li>WebUI 查看(50030);</li>
<li>命令行方式:hadoop job [-counter <job-id> <group-name> <counter-name>];</li>
<li>使用Hadoop API。
通过job.getCounters()得到Counters,而后调用counters.findCounter()方法去得到计数器对象;可参见《Hadoop权威指南》第8章 示例 8-2 MissingTemperaureFields.java</li>
</ul>
<h4>命令行方式示例</h4>
<pre class='brush:shell'>$ ./bin/hadoop job -counter job_201402211848_0004 FileSystemCounters HDFS_BYTES_READ
177
</pre><h3>自定义计数器</h3>
<p>统计词汇行中词汇数超过2个或少于2个的行数。 源代码: <a href="https://github.com/kangfoo/hadoop1.study/blob/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/mp/counter/TestCounter.java">TestCounter.java</a>TestCounter.java</p>
<h4>输入数据文件值 counter.txt:</h4>
<pre class='brush:text'>hello world
hello
hello world 111
hello world 111 222
</pre><p>执行参数</p>
<pre class='brush:java'>hdfs://master11:9000/counter/input/a.txt hdfs://master11:9000/counter/output1
</pre><p>计数器统计(hadoop eclipse 插件执行)结果:</p>
<pre class='brush:shell'>2014-02-21 00:03:38,676 INFO mapred.JobClient (Counters.java:log(587)) - ERROR_COUNTER
2014-02-21 00:03:38,677 INFO mapred.JobClient (Counters.java:log(589)) - Above_2=2
2014-02-21 00:03:38,677 INFO mapred.JobClient (Counters.java:log(589)) - BELOW_2=1
</pre></div>
</article>
<div class="pagination">
<span class="pagebar">
<span class="nolink-page" title="第 1 页,共 6 页">1/6</span>
<span class="current-page">1</span>
<a rel="full-article" href="/page/2/">2</a>
<a rel="full-article" href="/page/3/">3</a>
<span class="nolink-page">...</span>
<a rel="full-article" href="/page/6/">6</a>
<a rel="full-article" href="/page/2/" title="下一页">></a>
</span>
<a class="next" href="/archives">文章目录</a> </div>
</div>
<aside class="sidebar">
<section>
<h1>近期文章</h1>
<ul id="recent_posts">
<li class="post">
<a href="/article/2014/04/spring-batch--ru-men/">Springbatch入门</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-pipes--streaming/">Hadoop Pipes & Streaming</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce-sort/">Hadoop MapReduce Sort</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce-join/">Hadoop MapReduce Join</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce--ji-shu-qi/">Hadoop MapReduce 计数器</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce-recordreader-zu-jian/">Hadoop MapReduce RecordReader 组件</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce-partitioner--zu-jian/">Hadoop MapReduce Partitioner 组件</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce-combiner--zu-jian/">Hadoop MapReduce Combiner 组件</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce--lei-xing-yu-ge-shi/">Hadoop MapReduce 类型与格式</a>
</li>
<li class="post">
<a href="/article/2014/03/hadoop-mapreduce--gong-zuo-ji-zhi/">Hadoop MapReduce 工作机制</a>
</li>
</ul>
</section>
<section>
<h2>近期评论</h2>
<script language="JavaScript">
<!--
var is_https = ('https:' == document.location.protocol);
var rcw_script_src = (is_https ? 'https:' : 'http:') + '//kangaroo.disqus.com/recent_comments_widget.js?num_items=5&excerpt_length=100&hide_avatars=' + (is_https ? '1' : '0&avatar_size=32');
var rcw_script = '<scr' + 'ipt type="text/javascript" src="' + rcw_script_src + '"></scr' + 'ipt>';
document.writeln(rcw_script);
//-->
</script>
</section>
</aside>
</div>
</div>
<footer role="contentinfo"><p>
版权所有 © 2015 - kangfoo -
<span class="credit">Powered by <a href="http://www.opoopress.com/">OpooPress</a></span>
<script type="text/javascript">var cnzz_protocol = (("https:" == document.location.protocol) ? " https://" : " http://");document.write(unescape("%3Cspan id='cnzz_stat_icon_1000232528'%3E%3C/span%3E%3Cscript src='" + cnzz_protocol + "s22.cnzz.com/z_stat.php%3Fid%3D1000232528%26show%3Dpic' type='text/javascript'%3E%3C/script%3E"));</script>
</p>
</footer>
<script type="text/javascript" src="/javascripts/opoopress.min.js"></script>
<script language="JavaScript">
<!--
window.OpooPress = new OpooPressApp({siteUrl:'http://kangfoo.u.qiniudn.com/',rootUrl:'',pageUrl:'/',refreshRelativeTimes:true,verbose:true},{});
OpooPress.init();
var disqus_shortname = 'kangaroo';
OpooPress.showDisqusCommentCount();
//-->
</script>
<!-- START: Syntax Highlighter ComPress -->
<script type="text/javascript" src="/plugins/syntax-highlighter/scripts/shCore.js"></script>
<script type="text/javascript" src="/plugins/syntax-highlighter/scripts/shAutoloader.js"></script>
<script type="text/javascript">
SyntaxHighlighter.autoloader(
'applescript /plugins/syntax-highlighter/scripts/shBrushAppleScript.js',
'actionscript3 as3 /plugins/syntax-highlighter/scripts/shBrushAS3.js',
'bash shell /plugins/syntax-highlighter/scripts/shBrushBash.js',
'coldfusion cf /plugins/syntax-highlighter/scripts/shBrushColdFusion.js',
'cpp c /plugins/syntax-highlighter/scripts/shBrushCpp.js',
'c# c-sharp csharp /plugins/syntax-highlighter/scripts/shBrushCSharp.js',
'css /plugins/syntax-highlighter/scripts/shBrushCss.js',
'delphi pascal pas /plugins/syntax-highlighter/scripts/shBrushDelphi.js',
'diff patch /plugins/syntax-highlighter/scripts/shBrushDiff.js',
'erl erlang /plugins/syntax-highlighter/scripts/shBrushErlang.js',
'groovy /plugins/syntax-highlighter/scripts/shBrushGroovy.js',
'java /plugins/syntax-highlighter/scripts/shBrushJava.js',
'jfx javafx /plugins/syntax-highlighter/scripts/shBrushJavaFX.js',
'js jscript javascript /plugins/syntax-highlighter/scripts/shBrushJScript.js',
'perl pl /plugins/syntax-highlighter/scripts/shBrushPerl.js',
'php /plugins/syntax-highlighter/scripts/shBrushPhp.js',
'text plain /plugins/syntax-highlighter/scripts/shBrushPlain.js',
'powershell ps /plugins/syntax-highlighter/scripts/shBrushPowerShell.js',
'py python /plugins/syntax-highlighter/scripts/shBrushPython.js',
'ruby rails ror rb /plugins/syntax-highlighter/scripts/shBrushRuby.js',
'sass scss /plugins/syntax-highlighter/scripts/shBrushSass.js',
'scala /plugins/syntax-highlighter/scripts/shBrushScala.js',
'sql /plugins/syntax-highlighter/scripts/shBrushSql.js',
'vb vbnet /plugins/syntax-highlighter/scripts/shBrushVb.js',
'xml xhtml xslt html /plugins/syntax-highlighter/scripts/shBrushXml.js'
);
SyntaxHighlighter.defaults['auto-links'] = false;
SyntaxHighlighter.defaults['toolbar'] = false;
SyntaxHighlighter.defaults['tab-size'] = 4;
SyntaxHighlighter.all();
</script>
<!-- END: Syntax Highlighter ComPress -->
</body>
</html>