1 package org.apache.hadoop.fs.s3;
2
3 import java.io.BufferedInputStream;
4 import java.io.BufferedOutputStream;
5 import java.io.Closeable;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileOutputStream;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.io.OutputStream;
12 import java.net.URI;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.TreeSet;
17
18 import org.apache.hadoop.conf.Configuration;
19 import org.apache.hadoop.fs.Path;
20 import org.apache.hadoop.fs.s3.INode.FileType;
21 import org.jets3t.service.S3Service;
22 import org.jets3t.service.S3ServiceException;
23 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
24 import org.jets3t.service.model.S3Bucket;
25 import org.jets3t.service.model.S3Object;
26 import org.jets3t.service.security.AWSCredentials;
27
28 class Jets3tFileSystemStore implements FileSystemStore {
29
30 private static final String FILE_SYSTEM_NAME = "fs";
31 private static final String FILE_SYSTEM_VALUE = "Hadoop";
32
33 private static final String FILE_SYSTEM_TYPE_NAME = "fs-type";
34 private static final String FILE_SYSTEM_TYPE_VALUE = "block";
35
36 private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
37 private static final String FILE_SYSTEM_VERSION_VALUE = "1";
38
39 private static final Map<String, String> METADATA =
40 new HashMap<String, String>();
41
42 static {
43 METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
44 METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE);
45 METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE);
46 }
47
48 private static final String PATH_DELIMITER = Path.SEPARATOR;
49 private static final String BLOCK_PREFIX = "block_";
50
51 private Configuration conf;
52
53 private S3Service s3Service;
54
55 private S3Bucket bucket;
56
57 private int bufferSize;
58
59 public void initialize(URI uri, Configuration conf) throws IOException {
60
61 this.conf = conf;
62
63 S3Credentials s3Credentials = new S3Credentials();
64 s3Credentials.initialize(uri, conf);
65 try {
66 AWSCredentials awsCredentials =
67 new AWSCredentials(s3Credentials.getAccessKey(),
68 s3Credentials.getSecretAccessKey());
69 this.s3Service = new RestS3Service(awsCredentials);
70 } catch (S3ServiceException e) {
71 if (e.getCause() instanceof IOException) {
72 throw (IOException) e.getCause();
73 }
74 throw new S3Exception(e);
75 }
76 bucket = new S3Bucket(uri.getHost());
77
78 this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
79 }
80
81 public String getVersion() throws IOException {
82 return FILE_SYSTEM_VERSION_VALUE;
83 }
84
85 private void delete(String key) throws IOException {
86 try {
87 s3Service.deleteObject(bucket, key);
88 } catch (S3ServiceException e) {
89 if (e.getCause() instanceof IOException) {
90 throw (IOException) e.getCause();
91 }
92 throw new S3Exception(e);
93 }
94 }
95
96 public void deleteINode(Path path) throws IOException {
97 delete(pathToKey(path));
98 }
99
100 public void deleteBlock(Block block) throws IOException {
101 delete(blockToKey(block));
102 }
103
104 public boolean inodeExists(Path path) throws IOException {
105 InputStream in = get(pathToKey(path), true);
106 if (in == null) {
107 return false;
108 }
109 in.close();
110 return true;
111 }
112
113 public boolean blockExists(long blockId) throws IOException {
114 InputStream in = get(blockToKey(blockId), false);
115 if (in == null) {
116 return false;
117 }
118 in.close();
119 return true;
120 }
121
122 private InputStream get(String key, boolean checkMetadata)
123 throws IOException {
124
125 try {
126 S3Object object = s3Service.getObject(bucket, key);
127 if (checkMetadata) {
128 checkMetadata(object);
129 }
130 return object.getDataInputStream();
131 } catch (S3ServiceException e) {
132 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
133 return null;
134 }
135 if (e.getCause() instanceof IOException) {
136 throw (IOException) e.getCause();
137 }
138 throw new S3Exception(e);
139 }
140 }
141
142 private InputStream get(String key, long byteRangeStart) throws IOException {
143 try {
144 S3Object object = s3Service.getObject(bucket, key, null, null, null,
145 null, byteRangeStart, null);
146 return object.getDataInputStream();
147 } catch (S3ServiceException e) {
148 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
149 return null;
150 }
151 if (e.getCause() instanceof IOException) {
152 throw (IOException) e.getCause();
153 }
154 throw new S3Exception(e);
155 }
156 }
157
158 private void checkMetadata(S3Object object) throws S3FileSystemException,
159 S3ServiceException {
160
161 String name = (String) object.getMetadata(FILE_SYSTEM_NAME);
162 if (!FILE_SYSTEM_VALUE.equals(name)) {
163 throw new S3FileSystemException("Not a Hadoop S3 file.");
164 }
165 String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME);
166 if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) {
167 throw new S3FileSystemException("Not a block file.");
168 }
169 String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME);
170 if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) {
171 throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE,
172 dataVersion);
173 }
174 }
175
176 public INode retrieveINode(Path path) throws IOException {
177 return INode.deserialize(get(pathToKey(path), true));
178 }
179
180 public File retrieveBlock(Block block, long byteRangeStart)
181 throws IOException {
182 File fileBlock = null;
183 InputStream in = null;
184 OutputStream out = null;
185 try {
186 fileBlock = newBackupFile();
187 in = get(blockToKey(block), byteRangeStart);
188 out = new BufferedOutputStream(new FileOutputStream(fileBlock));
189 byte[] buf = new byte[bufferSize];
190 int numRead;
191 while ((numRead = in.read(buf)) >= 0) {
192 out.write(buf, 0, numRead);
193 }
194 return fileBlock;
195 } catch (IOException e) {
196 // close output stream to file then delete file
197 closeQuietly(out);
198 out = null; // to prevent a second close
199 if (fileBlock != null) {
200 fileBlock.delete();
201 }
202 throw e;
203 } finally {
204 closeQuietly(out);
205 closeQuietly(in);
206 }
207 }
208
209 private File newBackupFile() throws IOException {
210 File dir = new File(conf.get("fs.s3.buffer.dir"));
211 if (!dir.exists() && !dir.mkdirs()) {
212 throw new IOException("Cannot create S3 buffer directory: " + dir);
213 }
214 File result = File.createTempFile("input-", ".tmp", dir);
215 result.deleteOnExit();
216 return result;
217 }
218
219 public Set<Path> listSubPaths(Path path) throws IOException {
220 try {
221 String prefix = pathToKey(path);
222 if (!prefix.endsWith(PATH_DELIMITER)) {
223 prefix += PATH_DELIMITER;
224 }
225 S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER);
226 Set<Path> prefixes = new TreeSet<Path>();
227 for (int i = 0; i < objects.length; i++) {
228 prefixes.add(keyToPath(objects[i].getKey()));
229 }
230 prefixes.remove(path);
231 return prefixes;
232 } catch (S3ServiceException e) {
233 if (e.getCause() instanceof IOException) {
234 throw (IOException) e.getCause();
235 }
236 throw new S3Exception(e);
237 }
238 }
239
240 public Set<Path> listDeepSubPaths(Path path) throws IOException {
241 try {
242 String prefix = pathToKey(path);
243 if (!prefix.endsWith(PATH_DELIMITER)) {
244 prefix += PATH_DELIMITER;
245 }
246 S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
247 Set<Path> prefixes = new TreeSet<Path>();
248 for (int i = 0; i < objects.length; i++) {
249 prefixes.add(keyToPath(objects[i].getKey()));
250 }
251 prefixes.remove(path);
252 return prefixes;
253 } catch (S3ServiceException e) {
254 if (e.getCause() instanceof IOException) {
255 throw (IOException) e.getCause();
256 }
257 throw new S3Exception(e);
258 }
259 }
260
261 private void put(String key, InputStream in, long length, boolean storeMetadata)
262 throws IOException {
263
264 try {
265 S3Object object = new S3Object(key);
266 object.setDataInputStream(in);
267 object.setContentType("binary/octet-stream");
268 object.setContentLength(length);
269 if (storeMetadata) {
270 object.addAllMetadata(METADATA);
271 }
272 s3Service.putObject(bucket, object);
273 } catch (S3ServiceException e) {
274 if (e.getCause() instanceof IOException) {
275 throw (IOException) e.getCause();
276 }
277 throw new S3Exception(e);
278 }
279 }
280
281 public void storeINode(Path path, INode inode) throws IOException {
282 put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true);
283 }
284
285 public void storeBlock(Block block, File file) throws IOException {
286 BufferedInputStream in = null;
287 try {
288 in = new BufferedInputStream(new FileInputStream(file));
289 put(blockToKey(block), in, block.getLength(), false);
290 } finally {
291 closeQuietly(in);
292 }
293 }
294
295 private void closeQuietly(Closeable closeable) {
296 if (closeable != null) {
297 try {
298 closeable.close();
299 } catch (IOException e) {
300 // ignore
301 }
302 }
303 }
304
305 private String pathToKey(Path path) {
306 if (!path.isAbsolute()) {
307 throw new IllegalArgumentException("Path must be absolute: " + path);
308 }
309 return path.toUri().getPath();
310 }
311
312 private Path keyToPath(String key) {
313 return new Path(key);
314 }
315
316 private String blockToKey(long blockId) {
317 return BLOCK_PREFIX + blockId;
318 }
319
320 private String blockToKey(Block block) {
321 return blockToKey(block.getId());
322 }
323
324 public void purge() throws IOException {
325 try {
326 S3Object[] objects = s3Service.listObjects(bucket);
327 for (int i = 0; i < objects.length; i++) {
328 s3Service.deleteObject(bucket, objects[i].getKey());
329 }
330 } catch (S3ServiceException e) {
331 if (e.getCause() instanceof IOException) {
332 throw (IOException) e.getCause();
333 }
334 throw new S3Exception(e);
335 }
336 }
337
338 public void dump() throws IOException {
339 StringBuilder sb = new StringBuilder("S3 Filesystem, ");
340 sb.append(bucket.getName()).append("\n");
341 try {
342 S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
343 for (int i = 0; i < objects.length; i++) {
344 Path path = keyToPath(objects[i].getKey());
345 sb.append(path).append("\n");
346 INode m = retrieveINode(path);
347 sb.append("\t").append(m.getFileType()).append("\n");
348 if (m.getFileType() == FileType.DIRECTORY) {
349 continue;
350 }
351 for (int j = 0; j < m.getBlocks().length; j++) {
352 sb.append("\t").append(m.getBlocks()[j]).append("\n");
353 }
354 }
355 } catch (S3ServiceException e) {
356 if (e.getCause() instanceof IOException) {
357 throw (IOException) e.getCause();
358 }
359 throw new S3Exception(e);
360 }
361 System.out.println(sb);
362 }
363
364 }