Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.
Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.
We shall pass a ReadableByteChannel and thus get the stream into read objects.
package com.gkatzioura.arrow;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
public class DefaultEntriesReader implements Closeable {
private final RootAllocator rootAllocator;
public DefaultEntriesReader() {
rootAllocator = new RootAllocator(Integer.MAX_VALUE);
}
public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException {
List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>();
try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) {
var root = arrowStreamReader.getVectorSchemaRoot();
var childVector1 = (VarCharVector)root.getVector(0);
var childVector2 = (IntVector)root.getVector(1);
while (arrowStreamReader.loadNextBatch()) {
int batchSize = root.getRowCount();
for (int i = 0; i < batchSize; i++) {
var strData = new String(childVector1.get(i));
var intData = childVector2.get(i);
DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build();
defaultArrowEntries.add(defaultArrowEntry);
}
}
return defaultArrowEntries;
}
}
@Override
public void close() throws IOException {
rootAllocator.close();
}
}
Let’s wrap it up with a write and a Readpackage com.gkatzioura.arrow;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ArrowMain {
public static void main(String[] args) throws IOException {
var originalEntries = IntStream.rangeClosed(0, 11)
.boxed()
.map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList());
var outputStream = new ByteArrayOutputStream();
try(var arrowWriter = new DefaultEntriesWriter()) {
arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream));
}
byte[] introBytes = outputStream.toByteArray();
var inputStream = new ByteArrayInputStream(introBytes);
try(var arrowReader = new DefaultEntriesReader()) {
var entries =arrowReader.readBytes(Channels.newChannel(inputStream));
for (DefaultArrowEntry entry : entries) {
System.out.println("Read "+entry.getCol1()+" "+entry.getCol2());
}
}
}
}
Source: javacodegeeks.com
0 comments:
Post a Comment