From c85ee7c0fdb61fe401a8cab2512c65b4e48a8126 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Dec 2018 22:12:20 -0800 Subject: [PATCH] HCFS: read concatenated files --- .../seaweed/hdfs/SeaweedFileSystemStore.java | 3 + .../java/seaweed/hdfs/SeaweedInputStream.java | 3 + .../main/java/seaweed/hdfs/SeaweedRead.java | 73 +++++++++++-------- .../java/seaweedfs/hdfs/SeaweedReadTest.java | 66 +++++++++++++++++ 4 files changed, 116 insertions(+), 29 deletions(-) create mode 100644 other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index dd68e53f1..a399fba13 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -225,10 +225,13 @@ public class SeaweedFileSystemStore { long writePosition = 0; if (!overwrite) { FilerProto.Entry existingEntry = lookupEntry(path); + LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry); if (existingEntry != null) { + entry = FilerProto.Entry.newBuilder(); entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); } + LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); replication = existingEntry.getAttributes().getReplication(); } diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java index b31cae166..c0b296fb9 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -55,6 +55,9 @@ public class SeaweedInputStream extends FSInputStream { this.readAheadEnabled = true; this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + } public String getPath() { diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java index edc279adc..08aea5745 100644 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java +++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java @@ -41,6 +41,7 @@ public class SeaweedRead { //TODO parallel this long readCount = 0; + int startOffset = bufferOffset; for (ChunkView chunkView : chunkViews) { FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); if (locations.getLocationsCount() == 0) { @@ -59,10 +60,11 @@ public class SeaweedRead { HttpEntity entity = response.getEntity(); int len = (int) (chunkView.logicOffset - position + chunkView.size); - entity.getContent().read(buffer, bufferOffset, len); + int chunReadCount = entity.getContent().read(buffer, startOffset, len); - LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength); + LOG.debug("* read chunkView:{} startOffset:{} length:{} chunReadCount:{}", chunkView, startOffset, len, chunReadCount); readCount += len; + startOffset += len; } catch (IOException e) { e.printStackTrace(); @@ -72,17 +74,20 @@ public class SeaweedRead { return readCount; } - private static List viewFromVisibles(List visibleIntervals, long offset, long size) { + public static List viewFromVisibles(List visibleIntervals, long offset, long size) { List views = new ArrayList<>(); long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset - )); + if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + views.add(new ChunkView( + chunk.fileId, + offset - chunk.start, + Math.min(chunk.stop, stop) - offset, + offset + )); + offset = Math.min(chunk.stop, stop); + } } return views; } @@ -96,20 +101,10 @@ public class SeaweedRead { } }); - List newVisibles = new ArrayList<>(); List visibles = new ArrayList<>(); for (FilerProto.FileChunk chunk : chunks) { - List t = newVisibles; - newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk); - if (t != newVisibles) { - // visibles are changed in place - } else { - // newVisibles are modified - visibles.clear(); - t = visibles; - visibles = newVisibles; - newVisibles = t; - } + List newVisibles = new ArrayList<>(); + visibles = mergeIntoVisibles(visibles, newVisibles, chunk); } return visibles; @@ -192,10 +187,10 @@ public class SeaweedRead { } public static class VisibleInterval { - long start; - long stop; - long modifiedTime; - String fileId; + public final long start; + public final long stop; + public final long modifiedTime; + public final String fileId; public VisibleInterval(long start, long stop, String fileId, long modifiedTime) { this.start = start; @@ -203,13 +198,23 @@ public class SeaweedRead { this.modifiedTime = modifiedTime; this.fileId = fileId; } + + @Override + public String toString() { + return "VisibleIntervalq{" + + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + '}'; + } } public static class ChunkView { - String fileId; - long offset; - long size; - long logicOffset; + public final String fileId; + public final long offset; + public final long size; + public final long logicOffset; public ChunkView(String fileId, long offset, long size, long logicOffset) { this.fileId = fileId; @@ -217,6 +222,16 @@ public class SeaweedRead { this.size = size; this.logicOffset = logicOffset; } + + @Override + public String toString() { + return "ChunkView{" + + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + '}'; + } } } diff --git a/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java b/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java new file mode 100644 index 000000000..4bb9efff5 --- /dev/null +++ b/other/java/hdfs/src/test/java/seaweedfs/hdfs/SeaweedReadTest.java @@ -0,0 +1,66 @@ +package seaweedfs.hdfs; + +import org.junit.Test; +import seaweed.hdfs.SeaweedRead; +import seaweedfs.client.FilerProto; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class SeaweedReadTest { + + @Test + public void testNonOverlappingVisibleIntervals() { + List chunks = new ArrayList<>(); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("aaa") + .setOffset(0) + .setSize(100) + .setMtime(1000) + .build()); + chunks.add(FilerProto.FileChunk.newBuilder() + .setFileId("bbb") + .setOffset(100) + .setSize(133) + .setMtime(2000) + .build()); + + List visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { + System.out.println("visible:" + visibleInterval); + } + + assertEquals(visibleIntervals.size(), 2); + + SeaweedRead.VisibleInterval visibleInterval = visibleIntervals.get(0); + assertEquals(visibleInterval.start, 0); + assertEquals(visibleInterval.stop, 100); + assertEquals(visibleInterval.modifiedTime, 1000); + assertEquals(visibleInterval.fileId, "aaa"); + + visibleInterval = visibleIntervals.get(1); + assertEquals(visibleInterval.start, 100); + assertEquals(visibleInterval.stop, 233); + assertEquals(visibleInterval.modifiedTime, 2000); + assertEquals(visibleInterval.fileId, "bbb"); + + List chunkViews = SeaweedRead.viewFromVisibles(visibleIntervals, 0, 233); + + SeaweedRead.ChunkView chunkView = chunkViews.get(0); + assertEquals(chunkView.offset, 0); + assertEquals(chunkView.size, 100); + assertEquals(chunkView.logicOffset, 0); + assertEquals(chunkView.fileId, "aaa"); + + chunkView = chunkViews.get(1); + assertEquals(chunkView.offset, 0); + assertEquals(chunkView.size, 133); + assertEquals(chunkView.logicOffset, 100); + assertEquals(chunkView.fileId, "bbb"); + + + } + +}