From b656e05aaf71208a7c146839b6432fe2aaaea5a6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 29 Mar 2020 00:55:40 -0700 Subject: [PATCH] HDFS: support chunk cache --- .../java/seaweedfs/client/ChunkCache.java | 27 ++++++++++ .../java/seaweedfs/client/SeaweedRead.java | 49 ++++++------------- .../java/seaweedfs/client/SeaweedWrite.java | 3 +- 3 files changed, 43 insertions(+), 36 deletions(-) create mode 100644 other/java/client/src/main/java/seaweedfs/client/ChunkCache.java diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..e249d4524 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,27 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private final Cache cache; + + public ChunkCache(int maxEntries) { + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 8e850f87e..ad92ba006 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -10,14 +10,14 @@ import org.apache.http.util.EntityUtils; import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + static ChunkCache chunkCache = new ChunkCache(1000); + // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, @@ -57,42 +57,23 @@ public class SeaweedRead { } private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { - if (chunkView.cipherKey != null) { - return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations); + + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); + + if (chunkData == null) { + chunkData = doFetchFullChunkData(chunkView, locations); } - // TODO: read the chunk and returns the chunk view data here + int len = (int) (chunkView.logicOffset - position + chunkView.size); + System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + chunkCache.setChunk(chunkView.fileId, chunkData); - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1)); - } - - try { - HttpResponse response = client.execute(request); - HttpEntity entity = response.getEntity(); - - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); - - return len; - - } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } - } + return len; } - private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + HttpClient client = new DefaultHttpClient(); HttpGet request = new HttpGet( String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); @@ -124,9 +105,7 @@ public class SeaweedRead { throw new IOException("fail to decrypt", e); } - int len = (int) (chunkView.logicOffset - position + chunkView.size); - System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len); - return len; + return data; } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 5bf24ef68..178234d5a 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -45,7 +45,8 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // TODO: cache fileId ~ bytes here + // cache fileId ~ bytes + SeaweedRead.chunkCache.setChunk(fileId, bytes); entry.addChunks(FilerProto.FileChunk.newBuilder() .setFileId(fileId)