/[Apache-SVN]/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
ViewVC logotype

Contents of /avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1095206 - (show annotations) (download)
Tue Apr 19 20:44:51 2011 UTC (13 years ago) by cutting
File size: 8668 byte(s)
AVRO-802. Java: Add documentation for non-Avro input, map-only jobs.
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.avro.mapred;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.io.File;
24 import java.net.URI;
25 import java.util.Iterator;
26
27 import org.apache.hadoop.io.SequenceFile;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Text;
33 import org.apache.hadoop.io.NullWritable;
34 import org.apache.hadoop.mapred.JobClient;
35 import org.apache.hadoop.mapred.JobConf;
36 import org.apache.hadoop.mapred.FileInputFormat;
37 import org.apache.hadoop.mapred.FileOutputFormat;
38 import org.apache.hadoop.mapred.MapReduceBase;
39 import org.apache.hadoop.mapred.Mapper;
40 import org.apache.hadoop.mapred.Reducer;
41 import org.apache.hadoop.mapred.OutputCollector;
42 import org.apache.hadoop.mapred.Reporter;
43 import org.apache.hadoop.mapred.SequenceFileInputFormat;
44 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45
46 import org.apache.avro.Schema;
47 import org.apache.avro.file.FileReader;
48 import org.apache.avro.file.DataFileReader;
49 import org.apache.avro.specific.SpecificDatumReader;
50 import org.apache.avro.util.Utf8;
51
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54
55 public class TestSequenceFileReader {
56 private static final int COUNT =
57 Integer.parseInt(System.getProperty("test.count", "10"));
58 private static final File DIR
59 = new File(System.getProperty("test.dir", "."));
60 private static final File FILE = new File(DIR, "test.seq");
61
62 private static final Schema SCHEMA
63 = Pair.getPairSchema(Schema.create(Schema.Type.LONG),
64 Schema.create(Schema.Type.STRING));
65
66 @BeforeClass
67 public static void testWriteSequenceFile() throws IOException {
68 FILE.delete();
69 Configuration c = new Configuration();
70 URI uri = FILE.toURI();
71 SequenceFile.Writer writer
72 = new SequenceFile.Writer(FileSystem.get(uri, c), c,
73 new Path(uri.toString()),
74 LongWritable.class, Text.class);
75 final LongWritable key = new LongWritable();
76 final Text val = new Text();
77 for (int i = 0; i < COUNT; ++i) {
78 key.set(i);
79 val.set(Integer.toString(i));
80 writer.append(key, val);
81 }
82 writer.close();
83 }
84
85 @Test
86 public void testReadSequenceFile() throws Exception {
87 checkFile(new SequenceFileReader<Long,CharSequence>(FILE));
88 }
89
90 public void checkFile(FileReader<Pair<Long,CharSequence>> reader) throws Exception {
91 long i = 0;
92 for (Pair<Long,CharSequence> p : reader) {
93 assertEquals((Long)i, p.key());
94 assertEquals(Long.toString(i), p.value().toString());
95 i++;
96 }
97 assertEquals(COUNT, i);
98 reader.close();
99 }
100
101 @Test
102 public void testSequenceFileInputFormat() throws Exception {
103 JobConf job = new JobConf();
104 Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
105
106 output.getFileSystem(job).delete(output);
107
108 // configure input for Avro from sequence file
109 AvroJob.setInputSequenceFile(job);
110 FileInputFormat.setInputPaths(job, FILE.toURI().toString());
111 AvroJob.setInputSchema(job, SCHEMA);
112
113 // mapper is default, identity
114 // reducer is default, identity
115
116 // configure output for avro
117 AvroJob.setOutputSchema(job, SCHEMA);
118 FileOutputFormat.setOutputPath(job, output);
119
120 JobClient.runJob(job);
121
122 checkFile(new DataFileReader<Pair<Long,CharSequence>>
123 (new File(output.toString()+"/part-00000.avro"),
124 new SpecificDatumReader<Pair<Long,CharSequence>>()));
125 }
126
127 private static class NonAvroMapper
128 extends MapReduceBase
129 implements Mapper<LongWritable,Text,AvroKey<Long>,AvroValue<Utf8>> {
130
131 public void map(LongWritable key, Text value,
132 OutputCollector<AvroKey<Long>,AvroValue<Utf8>> out,
133 Reporter reporter) throws IOException {
134 out.collect(new AvroKey<Long>(key.get()),
135 new AvroValue<Utf8>(new Utf8(value.toString())));
136 }
137 }
138
139 @Test
140 public void testNonAvroMapper() throws Exception {
141 JobConf job = new JobConf();
142 Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
143
144 output.getFileSystem(job).delete(output);
145
146 // configure input for non-Avro sequence file
147 job.setInputFormat(SequenceFileInputFormat.class);
148 FileInputFormat.setInputPaths(job, FILE.toURI().toString());
149
150 // use a hadoop mapper that emits Avro output
151 job.setMapperClass(NonAvroMapper.class);
152
153 // reducer is default, identity
154
155 // configure output for avro
156 FileOutputFormat.setOutputPath(job, output);
157 AvroJob.setOutputSchema(job, SCHEMA);
158
159 JobClient.runJob(job);
160
161 checkFile(new DataFileReader<Pair<Long,CharSequence>>
162 (new File(output.toString()+"/part-00000.avro"),
163 new SpecificDatumReader<Pair<Long,CharSequence>>()));
164 }
165
166 private static class NonAvroOnlyMapper
167 extends MapReduceBase
168 implements Mapper<LongWritable,Text,AvroWrapper<Pair<Long,Utf8>>,NullWritable> {
169
170 public void map(LongWritable key, Text value,
171 OutputCollector<AvroWrapper<Pair<Long,Utf8>>,NullWritable> out,
172 Reporter reporter) throws IOException {
173 out.collect(new AvroWrapper<Pair<Long,Utf8>>(new Pair<Long,Utf8>(key.get(), new Utf8(value.toString()))),
174 NullWritable.get());
175 }
176 }
177
178 @Test
179 public void testNonAvroMapOnly() throws Exception {
180 JobConf job = new JobConf();
181 Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
182
183 output.getFileSystem(job).delete(output);
184
185
186 // configure input for non-Avro sequence file
187 job.setInputFormat(SequenceFileInputFormat.class);
188 FileInputFormat.setInputPaths(job, FILE.toURI().toString());
189
190 // use a hadoop mapper that emits Avro output
191 job.setMapperClass(NonAvroOnlyMapper.class);
192
193 // configure output for avro
194 job.setNumReduceTasks(0); // map-only
195 FileOutputFormat.setOutputPath(job, output);
196 AvroJob.setOutputSchema(job, SCHEMA);
197
198 JobClient.runJob(job);
199
200 checkFile(new DataFileReader<Pair<Long,CharSequence>>
201 (new File(output.toString()+"/part-00000.avro"),
202 new SpecificDatumReader<Pair<Long,CharSequence>>()));
203 }
204
205 private static class NonAvroReducer
206 extends MapReduceBase
207 implements Reducer<AvroKey<Long>,AvroValue<Utf8>,LongWritable,Text> {
208
209 public void reduce(AvroKey<Long> key, Iterator<AvroValue<Utf8>> values,
210 OutputCollector<LongWritable, Text> out,
211 Reporter reporter) throws IOException {
212 while (values.hasNext()) {
213 AvroValue<Utf8> value = values.next();
214 out.collect(new LongWritable(key.datum()),
215 new Text(value.datum().toString()));
216 }
217 }
218 }
219
220 @Test
221 public void testNonAvroReducer() throws Exception {
222 JobConf job = new JobConf();
223 Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
224
225 output.getFileSystem(job).delete(output);
226
227 // configure input for Avro from sequence file
228 AvroJob.setInputSequenceFile(job);
229 AvroJob.setInputSchema(job, SCHEMA);
230 FileInputFormat.setInputPaths(job, FILE.toURI().toString());
231
232 // mapper is default, identity
233
234 // use a hadoop reducer that consumes Avro input
235 AvroJob.setMapOutputSchema(job, SCHEMA);
236 job.setReducerClass(NonAvroReducer.class);
237
238 // configure output for non-Avro SequenceFile
239 job.setOutputFormat(SequenceFileOutputFormat.class);
240 FileOutputFormat.setOutputPath(job, output);
241
242 // output key/value classes are default, LongWritable/Text
243
244 JobClient.runJob(job);
245
246 checkFile(new SequenceFileReader<Long,CharSequence>
247 (new File(output.toString()+"/part-00000")));
248 }
249
250 }

infrastructure at apache.org
ViewVC Help
Powered by ViewVC 1.1.26