From a9efaa6385eb4a2e166cfbe8d8f5498d7dd95a91 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 1 Dec 2020 17:20:31 -0800 Subject: [PATCH] HDFS: implement ByteBufferReadable fix https://github.com/chrislusf/seaweedfs/issues/1645 --- .../main/java/seaweed/hdfs/SeaweedInputStream.java | 12 ++++++++++-- .../main/java/seaweed/hdfs/SeaweedInputStream.java | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 8e406206d..2cf544162 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,6 +2,7 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; -public class SeaweedInputStream extends FSInputStream { +public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); @@ -85,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream { } long bytesRead = 0; - if (position+len < entry.getContent().size()) { + if (position+len <= entry.getContent().size()) { entry.getContent().copyTo(b, (int) position, (int) off, len); } else { bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); @@ -106,6 +108,12 @@ public class SeaweedInputStream extends FSInputStream { } + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + return read(buf.array(), buf.position(), buf.remaining()); + } + /** * Seek to given position in stream. * diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 8e406206d..2cf544162 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,6 +2,7 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; -public class SeaweedInputStream extends FSInputStream { +public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); @@ -85,7 +87,7 @@ public class SeaweedInputStream extends FSInputStream { } long bytesRead = 0; - if (position+len < entry.getContent().size()) { + if (position+len <= entry.getContent().size()) { entry.getContent().copyTo(b, (int) position, (int) off, len); } else { bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); @@ -106,6 +108,12 @@ public class SeaweedInputStream extends FSInputStream { } + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + return read(buf.array(), buf.position(), buf.remaining()); + } + /** * Seek to given position in stream. *