Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@
import io.delta.kernel.engine.JsonHandler;
import io.delta.kernel.engine.ParquetHandler;

/** A Beam specific {@link Engine} wrapper that provides a custom {@link ParquetHandler}. */
/** A Beam specific {@link Engine} wrapper that provides custom engine handlers. */
public class BeamEngine implements Engine {
private final Engine delegate;
private final ParquetHandler parquetHandler;
private final FileSystemClient fileSystemClient;

public BeamEngine(Engine delegate, ParquetHandler parquetHandler) {
this(delegate, parquetHandler, new BeamFileSystemClient());
}

public BeamEngine(
Engine delegate, ParquetHandler parquetHandler, FileSystemClient fileSystemClient) {
this.delegate = delegate;
this.parquetHandler = parquetHandler;
this.fileSystemClient = fileSystemClient;
}

public static BeamEngine withBeamFileSystemClient(Engine delegate) {
return new BeamEngine(delegate, delegate.getParquetHandler(), new BeamFileSystemClient());
}

@Override
Expand All @@ -45,7 +56,7 @@ public JsonHandler getJsonHandler() {

@Override
public FileSystemClient getFileSystemClient() {
return delegate.getFileSystemClient();
return fileSystemClient;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.delta;

import io.delta.kernel.engine.FileReadRequest;
import io.delta.kernel.engine.FileSystemClient;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;

/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */
public class BeamFileSystemClient implements FileSystemClient {
@Override
public CloseableIterator<FileStatus> listFrom(String path) throws IOException {
String glob = globForSiblings(path);
List<FileStatus> statuses = new ArrayList<>();
String normalizedInput = FileSystems.matchNewResource(path, false).toString();

for (MatchResult.Metadata metadata :
FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) {
String metadataPath = metadata.resourceId().toString();
if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(path)) >= 0) {
statuses.add(toDeltaFileStatus(metadata));
}
}
Comment on lines +57 to +63

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The normalizedInput variable is defined on line 55 but never used. Instead, the raw path parameter is used in the comparison on line 60. This can lead to incorrect filtering in listFrom if the input path and the matched metadata paths have different normalization formats (e.g., file:/ vs file:///).

Additionally, listFrom should filter out directories to prevent downstream components from attempting to read directories as files. We can achieve this by checking !metadata.resourceId().isDirectory().

    for (MatchResult.Metadata metadata :
        FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) {
      if (metadata.resourceId().isDirectory()) {
        continue;
      }
      String metadataPath = metadata.resourceId().toString();
      if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput)) >= 0) {
        statuses.add(toDeltaFileStatus(metadata));
      }
    }

statuses.sort(
(first, second) ->
normalizeForOrdering(first.getPath())
.compareTo(normalizeForOrdering(second.getPath())));
return closeableIterator(statuses.iterator());
}

@Override
public String resolvePath(String path) throws IOException {
try {
return getFileStatus(path).getPath();
} catch (IOException e) {
return FileSystems.matchNewResource(path, false).toString();
}
}

@Override
public CloseableIterator<ByteArrayInputStream> readFiles(
CloseableIterator<FileReadRequest> readRequests) {
return new CloseableIterator<ByteArrayInputStream>() {
@Override
public boolean hasNext() {
return readRequests.hasNext();
}

@Override
public ByteArrayInputStream next() {
FileReadRequest request = readRequests.next();
try {
return readRange(request.getPath(), request.getStartOffset(), request.getReadLength());
} catch (IOException e) {
throw new UncheckedIOException(
String.format(
"IOException reading from file %s at offset %s size %s",
request.getPath(), request.getStartOffset(), request.getReadLength()),
e);
}
}

@Override
public void close() throws IOException {
readRequests.close();
}
};
}

@Override
public boolean mkdirs(String path) throws IOException {
if (isLocalPath(path)) {
Files.createDirectories(toLocalPath(path));
}
return true;
}

@Override
public boolean delete(String path) throws IOException {
FileSystems.delete(
Collections.singletonList(FileSystems.matchNewResource(path, false)),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
return true;
}

@Override
public FileStatus getFileStatus(String path) throws IOException {
return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path));
}

@Override
public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException {
ResourceId srcResource = FileSystems.matchNewResource(src, false);
ResourceId dstResource = FileSystems.matchNewResource(dst, false);
MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW);
if (!overwrite
&& dstMatch.status() == MatchResult.Status.OK
&& !dstMatch.metadata().isEmpty()) {
throw new IOException("Destination already exists: " + dst);
}

if (overwrite) {
FileSystems.copy(
Collections.singletonList(srcResource), Collections.singletonList(dstResource));
} else {
FileSystems.copy(
Collections.singletonList(srcResource),
Collections.singletonList(dstResource),
MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
}
}

private static ByteArrayInputStream readRange(String path, int startOffset, int readLength)
throws IOException {
ResourceId resourceId = FileSystems.matchNewResource(path, false);
try (ReadableByteChannel channel = FileSystems.open(resourceId)) {
byte[] data = new byte[readLength];
if (channel instanceof SeekableByteChannel) {
((SeekableByteChannel) channel).position(startOffset);
readFully(channel, ByteBuffer.wrap(data));
} else {
try (InputStream stream = Channels.newInputStream(channel)) {
stream.skipNBytes(startOffset);
int read = stream.readNBytes(data, 0, readLength);
if (read != readLength) {
throw new EOFException(
String.format("Expected %s bytes from %s but read %s", readLength, path, read));
}
}
}
Comment on lines +161 to +170

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The methods InputStream.skipNBytes (introduced in Java 12) and InputStream.readNBytes (introduced in Java 9) are used here. Since Apache Beam supports Java 8 and Java 11 as target runtimes, using these Java 12+ APIs will cause NoSuchMethodError at runtime on those platforms.

We can use Guava's ByteStreams.skipFully and ByteStreams.readFully instead, which are fully compatible with Java 8/11 and are already available on the classpath.

      } else {
        try (InputStream stream = Channels.newInputStream(channel)) {
          com.google.common.io.ByteStreams.skipFully(stream, startOffset);
          com.google.common.io.ByteStreams.readFully(stream, data);
        }
      }

return new ByteArrayInputStream(data);
}
}

private static void readFully(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
while (buffer.hasRemaining()) {
if (channel.read(buffer) < 0) {
throw new EOFException("Unexpected end of file");
}
}
}

private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) {
return FileStatus.of(
metadata.resourceId().toString(), metadata.sizeBytes(), metadata.lastModifiedMillis());
}

private static String globForSiblings(String path) {
String normalized = path.replace('\\', '/');
int lastSlash = normalized.lastIndexOf('/');
if (lastSlash < 0) {
return "*";
}
return path.substring(0, lastSlash + 1) + "*";
}

private static String normalizeForOrdering(String path) {
return path.replace('\\', '/');
}

private static boolean isLocalPath(String path) {
int schemeSeparator = path.indexOf(':');
if (schemeSeparator < 0) {
return true;
}
String scheme = path.substring(0, schemeSeparator).toLowerCase(Locale.ROOT);
return scheme.length() == 1 || "file".equals(scheme);
}

private static Path toLocalPath(String path) {
if (path.toLowerCase(Locale.ROOT).startsWith("file:")) {
return Paths.get(URI.create(path));
}
return Paths.get(path);
}

private static <T> CloseableIterator<T> closeableIterator(Iterator<T> iterator) {
return new CloseableIterator<T>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return iterator.next();
}

@Override
public void close() {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void processElement(@Element String tablePath, OutputReceiver<DeltaReadTa
conf.set(entry.getKey(), entry.getValue());
}
}
Engine engine = DefaultEngine.create(conf);
Engine engine = BeamEngine.withBeamFileSystemClient(DefaultEngine.create(conf));
Table table = Table.forPath(engine, tablePath);
Snapshot snapshot = table.getLatestSnapshot(engine);
Scan scan = snapshot.getScanBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public PCollection<Row> expand(PBegin input) {
conf.set(entry.getKey(), entry.getValue());
}
}
Engine engine = DefaultEngine.create(conf);
Engine engine = BeamEngine.withBeamFileSystemClient(DefaultEngine.create(conf));
Table table = Table.forPath(engine, path);
io.delta.kernel.Snapshot snapshot = table.getLatestSnapshot(engine);
StructType deltaSchema = snapshot.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public DeltaReadTaskTracker newTracker(

@Setup
public void setUp() {
engine = DefaultEngine.create(getConfiguration());
engine = BeamEngine.withBeamFileSystemClient(DefaultEngine.create(getConfiguration()));
}

@ProcessElement
Expand All @@ -139,7 +139,8 @@ public ProcessContinuation processElement(
// of row groups that map to the current restriction.
BeamParquetHandler parquetHandler =
new BeamParquetHandler(getConfiguration(), currentEngine.getParquetHandler(), tracker);
BeamEngine beamEngine = new BeamEngine(currentEngine, parquetHandler);
BeamEngine beamEngine =
new BeamEngine(currentEngine, parquetHandler, currentEngine.getFileSystemClient());

long currentStartRgIndex = 0L;

Expand Down
Loading
Loading