From fb93ebb56e14d469b7ed71da92a14172f4841f75 Mon Sep 17 00:00:00 2001 From: joaohmalves Date: Tue, 9 Jun 2026 14:35:32 -0300 Subject: [PATCH] Implement BeamFileSystemClient for Delta Lake connector (#38554) --- .../apache/beam/sdk/io/delta/BeamEngine.java | 15 +- .../sdk/io/delta/BeamFileSystemClient.java | 236 ++++++++++++ .../sdk/io/delta/CreateReadTasksDoFn.java | 2 +- .../org/apache/beam/sdk/io/delta/DeltaIO.java | 2 +- .../beam/sdk/io/delta/DeltaSourceDoFn.java | 5 +- .../io/delta/BeamFileSystemClientTest.java | 347 ++++++++++++++++++ .../apache/beam/sdk/io/delta/DeltaIOTest.java | 74 +++- 7 files changed, 670 insertions(+), 11 deletions(-) create mode 100644 sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java create mode 100644 sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/BeamFileSystemClientTest.java diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamEngine.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamEngine.java index de82d8d01b81..490255725c5b 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamEngine.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamEngine.java @@ -23,14 +23,25 @@ import io.delta.kernel.engine.JsonHandler; import io.delta.kernel.engine.ParquetHandler; -/** A Beam specific {@link Engine} wrapper that provides a custom {@link ParquetHandler}. */ +/** A Beam specific {@link Engine} wrapper that provides custom engine handlers. */ public class BeamEngine implements Engine { private final Engine delegate; private final ParquetHandler parquetHandler; + private final FileSystemClient fileSystemClient; public BeamEngine(Engine delegate, ParquetHandler parquetHandler) { + this(delegate, parquetHandler, new BeamFileSystemClient()); + } + + public BeamEngine( + Engine delegate, ParquetHandler parquetHandler, FileSystemClient fileSystemClient) { this.delegate = delegate; this.parquetHandler = parquetHandler; + this.fileSystemClient = fileSystemClient; + } + + public static BeamEngine withBeamFileSystemClient(Engine delegate) { + return new BeamEngine(delegate, delegate.getParquetHandler(), new BeamFileSystemClient()); } @Override @@ -45,7 +56,7 @@ public JsonHandler getJsonHandler() { @Override public FileSystemClient getFileSystemClient() { - return delegate.getFileSystemClient(); + return fileSystemClient; } @Override diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java new file mode 100644 index 000000000000..83336e48d3d1 --- /dev/null +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(path)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } + statuses.sort( + (first, second) -> + normalizeForOrdering(first.getPath()) + .compareTo(normalizeForOrdering(second.getPath()))); + return closeableIterator(statuses.iterator()); + } + + @Override + public String resolvePath(String path) throws IOException { + try { + return getFileStatus(path).getPath(); + } catch (IOException e) { + return FileSystems.matchNewResource(path, false).toString(); + } + } + + @Override + public CloseableIterator readFiles( + CloseableIterator readRequests) { + return new CloseableIterator() { + @Override + public boolean hasNext() { + return readRequests.hasNext(); + } + + @Override + public ByteArrayInputStream next() { + FileReadRequest request = readRequests.next(); + try { + return readRange(request.getPath(), request.getStartOffset(), request.getReadLength()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format( + "IOException reading from file %s at offset %s size %s", + request.getPath(), request.getStartOffset(), request.getReadLength()), + e); + } + } + + @Override + public void close() throws IOException { + readRequests.close(); + } + }; + } + + @Override + public boolean mkdirs(String path) throws IOException { + if (isLocalPath(path)) { + Files.createDirectories(toLocalPath(path)); + } + return true; + } + + @Override + public boolean delete(String path) throws IOException { + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(path, false)), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + return true; + } + + @Override + public FileStatus getFileStatus(String path) throws IOException { + return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path)); + } + + @Override + public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException { + ResourceId srcResource = FileSystems.matchNewResource(src, false); + ResourceId dstResource = FileSystems.matchNewResource(dst, false); + MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW); + if (!overwrite + && dstMatch.status() == MatchResult.Status.OK + && !dstMatch.metadata().isEmpty()) { + throw new IOException("Destination already exists: " + dst); + } + + if (overwrite) { + FileSystems.copy( + Collections.singletonList(srcResource), Collections.singletonList(dstResource)); + } else { + FileSystems.copy( + Collections.singletonList(srcResource), + Collections.singletonList(dstResource), + MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + } + } + + private static ByteArrayInputStream readRange(String path, int startOffset, int readLength) + throws IOException { + ResourceId resourceId = FileSystems.matchNewResource(path, false); + try (ReadableByteChannel channel = FileSystems.open(resourceId)) { + byte[] data = new byte[readLength]; + if (channel instanceof SeekableByteChannel) { + ((SeekableByteChannel) channel).position(startOffset); + readFully(channel, ByteBuffer.wrap(data)); + } else { + try (InputStream stream = Channels.newInputStream(channel)) { + stream.skipNBytes(startOffset); + int read = stream.readNBytes(data, 0, readLength); + if (read != readLength) { + throw new EOFException( + String.format("Expected %s bytes from %s but read %s", readLength, path, read)); + } + } + } + return new ByteArrayInputStream(data); + } + } + + private static void readFully(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + if (channel.read(buffer) < 0) { + throw new EOFException("Unexpected end of file"); + } + } + } + + private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) { + return FileStatus.of( + metadata.resourceId().toString(), metadata.sizeBytes(), metadata.lastModifiedMillis()); + } + + private static String globForSiblings(String path) { + String normalized = path.replace('\\', '/'); + int lastSlash = normalized.lastIndexOf('/'); + if (lastSlash < 0) { + return "*"; + } + return path.substring(0, lastSlash + 1) + "*"; + } + + private static String normalizeForOrdering(String path) { + return path.replace('\\', '/'); + } + + private static boolean isLocalPath(String path) { + int schemeSeparator = path.indexOf(':'); + if (schemeSeparator < 0) { + return true; + } + String scheme = path.substring(0, schemeSeparator).toLowerCase(Locale.ROOT); + return scheme.length() == 1 || "file".equals(scheme); + } + + private static Path toLocalPath(String path) { + if (path.toLowerCase(Locale.ROOT).startsWith("file:")) { + return Paths.get(URI.create(path)); + } + return Paths.get(path); + } + + private static CloseableIterator closeableIterator(Iterator iterator) { + return new CloseableIterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return iterator.next(); + } + + @Override + public void close() {} + }; + } +} diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/CreateReadTasksDoFn.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/CreateReadTasksDoFn.java index 36c9a1a47f8c..200e844affc7 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/CreateReadTasksDoFn.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/CreateReadTasksDoFn.java @@ -52,7 +52,7 @@ public void processElement(@Element String tablePath, OutputReceiver expand(PBegin input) { conf.set(entry.getKey(), entry.getValue()); } } - Engine engine = DefaultEngine.create(conf); + Engine engine = BeamEngine.withBeamFileSystemClient(DefaultEngine.create(conf)); Table table = Table.forPath(engine, path); io.delta.kernel.Snapshot snapshot = table.getLatestSnapshot(engine); StructType deltaSchema = snapshot.getSchema(); diff --git a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaSourceDoFn.java b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaSourceDoFn.java index bd53c3c9d045..f17d85bd47d9 100644 --- a/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaSourceDoFn.java +++ b/sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaSourceDoFn.java @@ -114,7 +114,7 @@ public DeltaReadTaskTracker newTracker( @Setup public void setUp() { - engine = DefaultEngine.create(getConfiguration()); + engine = BeamEngine.withBeamFileSystemClient(DefaultEngine.create(getConfiguration())); } @ProcessElement @@ -139,7 +139,8 @@ public ProcessContinuation processElement( // of row groups that map to the current restriction. BeamParquetHandler parquetHandler = new BeamParquetHandler(getConfiguration(), currentEngine.getParquetHandler(), tracker); - BeamEngine beamEngine = new BeamEngine(currentEngine, parquetHandler); + BeamEngine beamEngine = + new BeamEngine(currentEngine, parquetHandler, currentEngine.getFileSystemClient()); long currentStartRgIndex = 0L; diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/BeamFileSystemClientTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/BeamFileSystemClientTest.java new file mode 100644 index 000000000000..46e9aa7a15bc --- /dev/null +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/BeamFileSystemClientTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link BeamFileSystemClient}. */ +@RunWith(JUnit4.class) +public class BeamFileSystemClientTest { + + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); + + private final BeamFileSystemClient client = new BeamFileSystemClient(); + + // -------------------------------------------------------------------------- + // getFileStatus + // -------------------------------------------------------------------------- + + @Test + public void testGetFileStatus() throws Exception { + File file = createFile("test.json", "hello"); + + FileStatus status = client.getFileStatus(file.getAbsolutePath()); + + Assert.assertEquals(file.length(), status.getSize()); + } + + @Test(expected = IOException.class) + public void testGetFileStatusNonExistentThrows() throws Exception { + client.getFileStatus(tempFolder.getRoot().getAbsolutePath() + "/nonexistent.json"); + } + + // -------------------------------------------------------------------------- + // listFrom + // -------------------------------------------------------------------------- + + @Test + public void testListFromReturnsFilesStartingFromPath() throws Exception { + File dir = tempFolder.newFolder("list-test"); + + createFile(dir, "00000000000000000000.json", "a"); + File second = createFile(dir, "00000000000000000001.json", "b"); + createFile(dir, "00000000000000000002.json", "c"); + + Assert.assertEquals( + Arrays.asList( + "00000000000000000001.json", + "00000000000000000002.json"), + listFileNames(second.getAbsolutePath())); + } + + @Test + public void testListFromEmptyDirectoryReturnsEmpty() throws Exception { + File dir = tempFolder.newFolder("empty-dir"); + + String startPath = + dir.getAbsolutePath() + "/00000000000000000000.json"; + + Assert.assertTrue(listFileNames(startPath).isEmpty()); + } + + @Test + public void testListFromIncludesStartFile() throws Exception { + File dir = tempFolder.newFolder("list-inclusive"); + + File only = + createFile(dir, "00000000000000000005.json", "x"); + + Assert.assertEquals( + Arrays.asList("00000000000000000005.json"), + listFileNames(only.getAbsolutePath())); + } + + // -------------------------------------------------------------------------- + // resolvePath + // -------------------------------------------------------------------------- + + @Test + public void testResolvePathExistingFile() throws Exception { + File file = createFile("resolve-test.json", "data"); + + String resolved = client.resolvePath(file.getAbsolutePath()); + + Assert.assertNotNull(resolved); + Assert.assertTrue(resolved.contains("resolve-test.json")); + } + + @Test + public void testResolvePathNonExistentFallsBackToMatchNewResource() throws Exception { + String path = + tempFolder.getRoot().getAbsolutePath() + "/does-not-exist.json"; + + String resolved = client.resolvePath(path); + + Assert.assertNotNull(resolved); + Assert.assertTrue(resolved.contains("does-not-exist.json")); + } + + // -------------------------------------------------------------------------- + // readFiles + // -------------------------------------------------------------------------- + + @Test + public void testReadFilesWithOffsetAndLength() throws Exception { + File file = createFile("read-test.json", "0123456789"); + + Assert.assertEquals( + "2345", + readContent(makeRequest(file.getAbsolutePath(), 2, 4))); + } + + @Test + public void testReadFilesFromStart() throws Exception { + File file = createFile("read-start.json", "abcdef"); + + Assert.assertEquals( + "abc", + readContent(makeRequest(file.getAbsolutePath(), 0, 3))); + } + + // -------------------------------------------------------------------------- + // mkdirs + // -------------------------------------------------------------------------- + + @Test + public void testMkdirsCreatesLocalDirectory() throws Exception { + File nested = + new File(tempFolder.getRoot(), "a/b/c"); + + Assert.assertFalse(nested.exists()); + + boolean result = client.mkdirs(nested.getAbsolutePath()); + + Assert.assertTrue(result); + Assert.assertTrue(nested.isDirectory()); + } + + @Test + public void testMkdirsOnObjectStorageReturnsTrue() throws Exception { + boolean result = + client.mkdirs("gs://some-bucket/some/path/"); + + Assert.assertTrue(result); + } + + // -------------------------------------------------------------------------- + // delete + // -------------------------------------------------------------------------- + + @Test + public void testDeleteExistingFile() throws Exception { + File file = createFile("to-delete.json", "bye"); + + Assert.assertTrue(file.exists()); + + boolean result = + client.delete(file.getAbsolutePath()); + + Assert.assertTrue(result); + Assert.assertFalse(file.exists()); + } + + @Test + public void testDeleteNonExistentFileDoesNotThrow() throws Exception { + String path = + tempFolder.getRoot().getAbsolutePath() + "/ghost.json"; + + boolean result = client.delete(path); + + Assert.assertTrue(result); + } + + // -------------------------------------------------------------------------- + // copyFileAtomically + // -------------------------------------------------------------------------- + + @Test + public void testCopyFileAtomicallyWithoutOverwrite() throws Exception { + File src = createFile("src.json", "content"); + + File dst = + new File(tempFolder.getRoot(), "dst.json"); + + Assert.assertFalse(dst.exists()); + + client.copyFileAtomically( + src.getAbsolutePath(), + dst.getAbsolutePath(), + false); + + verifyFileContent(dst, "content"); + } + + @Test + public void testCopyFileAtomicallyWithOverwrite() throws Exception { + File src = + createFile("src-overwrite.json", "new-content"); + + File dst = + createFile("dst-overwrite.json", "old-content"); + + client.copyFileAtomically( + src.getAbsolutePath(), + dst.getAbsolutePath(), + true); + + verifyFileContent(dst, "new-content"); + } + + @Test(expected = IOException.class) + public void testCopyFileAtomicallyWithoutOverwriteThrowsIfDestinationExists() + throws Exception { + + File src = + createFile("src-no-overwrite.json", "data"); + + File dst = + createFile("dst-no-overwrite.json", "existing"); + + client.copyFileAtomically( + src.getAbsolutePath(), + dst.getAbsolutePath(), + false); + } + + // -------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------- + + private File createFile(String name, String content) throws IOException { + File file = tempFolder.newFile(name); + + Files.write( + file.toPath(), + content.getBytes(StandardCharsets.UTF_8)); + + return file; + } + + private File createFile( + File directory, + String name, + String content) + throws IOException { + + File file = new File(directory, name); + + Files.write( + file.toPath(), + content.getBytes(StandardCharsets.UTF_8)); + + return file; + } + + private void verifyFileContent( + File file, + String expected) + throws IOException { + + Assert.assertEquals( + expected, + Files.readString(file.toPath())); + } + + private List listFileNames(String startPath) throws Exception { + List listed = new ArrayList<>(); + + try (CloseableIterator iter = + client.listFrom(startPath)) { + + while (iter.hasNext()) { + listed.add( + new File(iter.next().getPath()).getName()); + } + } + + return listed; + } + + private String readContent(FileReadRequest request) throws Exception { + try (CloseableIterator streams = + client.readFiles( + Utils.singletonCloseableIterator(request))) { + + Assert.assertTrue(streams.hasNext()); + + return new String( + streams.next().readAllBytes(), + StandardCharsets.UTF_8); + } + } + + private static FileReadRequest makeRequest( + String path, + int startOffset, + int readLength) { + + return new FileReadRequest() { + @Override + public String getPath() { + return path; + } + + @Override + public int getStartOffset() { + return startOffset; + } + + @Override + public int getReadLength() { + return readLength; + } + }; + } +} diff --git a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java index ef6dd660c607..f40a63b76c01 100644 --- a/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java +++ b/sdks/java/io/delta/src/test/java/org/apache/beam/sdk/io/delta/DeltaIOTest.java @@ -205,12 +205,13 @@ public void testCreateReadTasksDoFn() throws Exception { org.apache.beam.sdk.values.TypeDescriptors.strings()) .via( task -> - io.delta.kernel.internal.InternalScanFileUtils.getAddFileStatus( - task.getScanFileRows().get(0)) - .getPath())); + new File( + io.delta.kernel.internal.InternalScanFileUtils.getAddFileStatus( + task.getScanFileRows().get(0)) + .getPath()) + .getName())); - PAssert.that(paths) - .containsInAnyOrder("file:" + tableDir.getAbsolutePath() + "/part-00000.parquet"); + PAssert.that(paths).containsInAnyOrder("part-00000.parquet"); writePipeline.run().waitUntilFinish(); } @@ -564,6 +565,69 @@ public void close() {} BeamEngine beamEngine = new BeamEngine(io.delta.kernel.defaults.engine.DefaultEngine.create(conf), handler); org.junit.Assert.assertEquals(handler, beamEngine.getParquetHandler()); + org.junit.Assert.assertTrue(beamEngine.getFileSystemClient() instanceof BeamFileSystemClient); + } + + @Test + public void testBeamFileSystemClient() throws Exception { + File dir = tempFolder.newFolder("beam-fs-client"); + File first = new File(dir, "00000000000000000000.json"); + File second = new File(dir, "00000000000000000001.json"); + File third = new File(dir, "00000000000000000002.json"); + Files.write(first.toPath(), "first".getBytes(StandardCharsets.UTF_8)); + Files.write(second.toPath(), "0123456789".getBytes(StandardCharsets.UTF_8)); + Files.write(third.toPath(), "third".getBytes(StandardCharsets.UTF_8)); + + BeamFileSystemClient client = new BeamFileSystemClient(); + + io.delta.kernel.utils.FileStatus status = client.getFileStatus(second.getAbsolutePath()); + org.junit.Assert.assertEquals(second.length(), status.getSize()); + + java.util.List listed = new java.util.ArrayList<>(); + try (io.delta.kernel.utils.CloseableIterator files = + client.listFrom(second.getAbsolutePath())) { + while (files.hasNext()) { + listed.add(new File(files.next().getPath()).getName()); + } + } + org.junit.Assert.assertEquals( + java.util.Arrays.asList("00000000000000000001.json", "00000000000000000002.json"), listed); + + io.delta.kernel.engine.FileReadRequest request = + new io.delta.kernel.engine.FileReadRequest() { + @Override + public String getPath() { + return second.getAbsolutePath(); + } + + @Override + public int getStartOffset() { + return 2; + } + + @Override + public int getReadLength() { + return 4; + } + }; + try (io.delta.kernel.utils.CloseableIterator streams = + client.readFiles(io.delta.kernel.internal.util.Utils.singletonCloseableIterator(request))) { + org.junit.Assert.assertTrue(streams.hasNext()); + org.junit.Assert.assertEquals( + "2345", new String(streams.next().readAllBytes(), StandardCharsets.UTF_8)); + org.junit.Assert.assertFalse(streams.hasNext()); + } + + File nestedDir = new File(dir, "nested"); + org.junit.Assert.assertTrue(client.mkdirs(nestedDir.getAbsolutePath())); + org.junit.Assert.assertTrue(nestedDir.isDirectory()); + + File copied = new File(dir, "copy.json"); + client.copyFileAtomically(second.getAbsolutePath(), copied.getAbsolutePath(), false); + org.junit.Assert.assertEquals("0123456789", Files.readString(copied.toPath())); + + org.junit.Assert.assertTrue(client.delete(copied.getAbsolutePath())); + org.junit.Assert.assertFalse(copied.exists()); } @Test