Wednesday, June 2, 2021

Apache Arrow on the JVM: Streaming Reads

JVM Exam Prep, JVM Certification, JVM Tutorial and Material, JVM Career, Oracle Java Preparation, Oracle Java Exam Prep

Previously we wrote Arrow Data to a Stream. Now we shall read those data from a stream.

Just like on the previous blog the we shall implement the Closeable interface. This is needed to close the RootAllocator and free-up memory.

We shall pass a ReadableByteChannel and thus get the stream into read objects. 

package com.gkatzioura.arrow;

import java.io.Closeable;

import java.io.IOException;

import java.nio.channels.ReadableByteChannel;

import java.util.ArrayList;

import java.util.List;

import org.apache.arrow.memory.RootAllocator;

import org.apache.arrow.vector.IntVector;

import org.apache.arrow.vector.VarCharVector;

import org.apache.arrow.vector.ipc.ArrowStreamReader;

public class DefaultEntriesReader implements Closeable {

    private final RootAllocator rootAllocator;

    public DefaultEntriesReader() {

        rootAllocator = new RootAllocator(Integer.MAX_VALUE);

    }

    public List<DefaultArrowEntry> readBytes(ReadableByteChannel readableByteChannel) throws IOException {

        List<DefaultArrowEntry> defaultArrowEntries = new ArrayList<>();

        try(ArrowStreamReader arrowStreamReader = new ArrowStreamReader(readableByteChannel, rootAllocator)) {

            var root = arrowStreamReader.getVectorSchemaRoot();

            var childVector1 = (VarCharVector)root.getVector(0);

            var childVector2 = (IntVector)root.getVector(1);

            while (arrowStreamReader.loadNextBatch()) {

                int batchSize = root.getRowCount();

                for (int i = 0; i < batchSize; i++) {

                    var strData = new String(childVector1.get(i));

                    var intData = childVector2.get(i);

                    DefaultArrowEntry defaultArrowEntry = DefaultArrowEntry.builder().col1(strData).col2(intData).build();

                    defaultArrowEntries.add(defaultArrowEntry);

                }

            }

            return defaultArrowEntries;

        }

    }

    @Override

    public void close() throws IOException {

        rootAllocator.close();

    }

}

JVM Exam Prep, JVM Certification, JVM Tutorial and Material, JVM Career, Oracle Java Preparation, Oracle Java Exam Prep
Let’s wrap it up with a write and a Read

package com.gkatzioura.arrow;

import java.io.ByteArrayInputStream;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.nio.channels.Channels;

import java.util.stream.Collectors;

import java.util.stream.IntStream;

public class ArrowMain {

    public static void main(String[] args) throws IOException {

        var originalEntries = IntStream.rangeClosed(0, 11)

                             .boxed()

                             .map(i -> new DefaultArrowEntry("data-"+i, i)).collect(Collectors.toList());

        var outputStream = new ByteArrayOutputStream();

        try(var arrowWriter = new DefaultEntriesWriter()) {

            arrowWriter.write(originalEntries, 10, Channels.newChannel(outputStream));

        }

        byte[] introBytes = outputStream.toByteArray();

        var inputStream = new ByteArrayInputStream(introBytes);

        try(var arrowReader = new DefaultEntriesReader()) {

            var entries =arrowReader.readBytes(Channels.newChannel(inputStream));

            for (DefaultArrowEntry entry : entries) {

                System.out.println("Read "+entry.getCol1()+" "+entry.getCol2());

            }

        }

    }

}

Source: javacodegeeks.com

Related Posts

0 comments:

Post a Comment