1
0
Fork 0
mirror of https://github.com/chrislusf/seaweedfs synced 2024-09-09 02:20:34 +02:00

HDFS: support chunk cache

This commit is contained in:
Chris Lu 2020-03-29 00:55:40 -07:00
parent 057722bbf4
commit b656e05aaf
3 changed files with 43 additions and 36 deletions

View file

@ -0,0 +1,27 @@
package seaweedfs.client;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.concurrent.TimeUnit;
public class ChunkCache {
private final Cache<String, byte[]> cache;
public ChunkCache(int maxEntries) {
this.cache = CacheBuilder.newBuilder()
.maximumSize(maxEntries)
.expireAfterAccess(1, TimeUnit.HOURS)
.build();
}
public byte[] getChunk(String fileId) {
return this.cache.getIfPresent(fileId);
}
public void setChunk(String fileId, byte[] data) {
this.cache.put(fileId, data);
}
}

View file

@ -10,14 +10,14 @@ import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
public class SeaweedRead {
// private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
static ChunkCache chunkCache = new ChunkCache(1000);
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
@ -57,42 +57,23 @@ public class SeaweedRead {
}
private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
if (chunkView.cipherKey != null) {
return readEncryptedChunkView(position, buffer, startOffset, chunkView, locations);
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
chunkData = doFetchFullChunkData(chunkView, locations);
}
// TODO: read the chunk and returns the chunk view data here
int len = (int) (chunkView.logicOffset - position + chunkView.size);
System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len);
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
chunkCache.setChunk(chunkView.fileId, chunkData);
if (!chunkView.isFullChunk) {
request.setHeader(HttpHeaders.ACCEPT_ENCODING, "");
request.setHeader(HttpHeaders.RANGE,
String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size - 1));
}
try {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
int len = (int) (chunkView.logicOffset - position + chunkView.size);
OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len));
entity.writeTo(outputStream);
// LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len);
return len;
} finally {
if (client instanceof Closeable) {
Closeable t = (Closeable) client;
t.close();
}
}
return len;
}
private static int readEncryptedChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
HttpClient client = new DefaultHttpClient();
HttpGet request = new HttpGet(
String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId));
@ -124,9 +105,7 @@ public class SeaweedRead {
throw new IOException("fail to decrypt", e);
}
int len = (int) (chunkView.logicOffset - position + chunkView.size);
System.arraycopy(data, (int) chunkView.offset, buffer, startOffset, len);
return len;
return data;
}

View file

@ -45,7 +45,8 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
// TODO: cache fileId ~ bytes here
// cache fileId ~ bytes
SeaweedRead.chunkCache.setChunk(fileId, bytes);
entry.addChunks(FilerProto.FileChunk.newBuilder()
.setFileId(fileId)