Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,21 @@

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Python SDK now supports memory profiling with Memray ([#38853](https://github.com/apache/beam/issues/38853)).
* (Python) Added [Qdrant](https://qdrant.tech/) VectorDatabaseWriteConfig implementation ([#38141](https://github.com/apache/beam/issues/38141)).


## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)).
* (Python) Added instrumentation to support off-the-shelf profiling agents when launching Python SDK Harness ([#38853](https://github.com/apache/beam/issues/38853)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* (Python) Typehints of dataclass fields are honored during type inferences. To restore the behavior of fallback-to-any,
use pipeline option `--exclude_infer_dataclass_field_type` ([#38797](https://github.com/apache/beam/issues/38797)).
However fixing forward is recommended.
Expand All @@ -85,7 +84,6 @@

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed IcebergIO writing manifest column bounds padded with trailing `0x00` bytes, which broke equality predicate pushdown in some query engines (Java) ([#38580](https://github.com/apache/beam/issues/38580)).

## Security Fixes
Expand Down
78 changes: 78 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,84 @@ def _add_argparse_args(cls, parser):
default=1.0,
help='A number between 0 and 1 indicating the ratio '
'of bundles that should be profiled.')
parser.add_argument(
'--profiler_agent',
default=None,
help=(
'Specifies the profiling agent to launch the SDK worker harness '
'with (e.g., "memray", "tcmalloc", or a custom wrapper script/binary).'
))
parser.add_argument(
'--profiler_extra_arg',
'--profiler_extra_args',
dest='profiler_extra_args',
action=_CommaSeparatedListAction,
default=None,
help=
'Comma-separated list of extra arguments to pass to the profiler agent.'
)
parser.add_argument(
'--profiler_extra_env_var',
'--profiler_extra_env_vars',
dest='profiler_extra_env_vars',
action=_CommaSeparatedListAction,
default=None,
help=(
'Comma-separated list of environment variables required by the profiler agent '
'in format "KEY1=VAL1,KEY2=VAL2".'))
parser.add_argument(
'--profile_temp_location',
default=None,
help=(
'Directory path on the worker where local profiles are saved. '
'Defaults to ${semi_persist_dir}/profiles if not specified.'))
parser.add_argument(
'--profile_upload_interval_sec',
type=int,
default=300,
help=(
'Frequency (in seconds) at which the local profiles are uploaded to GCS. '
'Defaults to 300 (5 min).'))
parser.add_argument(
'--profiler_stop_after_sec',
type=int,
default=0,
help=(
'Time limit (in seconds) for profiling a single process. When exceeded, '
'the worker process is restarted without the profiler.'))
parser.add_argument(
'--profiler_stop_after_crash',
action='store_true',
default=False,
help=(
'If True, the profiling agent won\'t be re-enabled after a worker '
'process crash.'))
parser.add_argument(
'--profile_postprocess_interval_sec',
type=int,
default=600,
help=(
'Frequency (in seconds) at which the local profiles are post-processed '
'on-the-fly. Defaults to 600 (10 minutes). Set to 0 to disable.'))

def validate(self, validator):
errors = []
if self.profiler_agent:
if self.profile_cpu or self.profile_memory:
errors.append(
'--profiler_agent is mutually exclusive with --profile_cpu '
'and --profile_memory.')

if self.profile_location and self.profile_location.lower() == 'none':
self.profile_location = None
elif not self.profile_location:
temp_location = self.view_as(GoogleCloudOptions).temp_location
if temp_location:
self.profile_location = temp_location.rstrip('/') + '/profiles'
_LOGGER.info(
'Setting --profile_location to %s since profiling is enabled.',
self.profile_location)
return errors


class SetupOptions(PipelineOptions):
Expand Down
46 changes: 46 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,52 @@ def test_unknown_option_prefix(self):
options = PipelineOptions(['--type_check_strictness', 'blahblah'])
options.view_as(TypeOptions)

def test_profiling_agent_is_exclusive_with_legacy_profiling_options(self):
options = PipelineOptions(['--profiler_agent=memray'])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])

options = PipelineOptions(['--profiler_agent=memray', '--profile_cpu'])
validator = PipelineOptionsValidator(options, None)
errors = validator.validate()
self.assertTrue(
any('--profiler_agent is mutually exclusive' in err for err in errors))

options = PipelineOptions(['--profiler_agent=memray', '--profile_memory'])
validator = PipelineOptionsValidator(options, None)
errors = validator.validate()
self.assertTrue(
any('--profiler_agent is mutually exclusive' in err for err in errors))

def test_profile_location_defaulting_and_opt_out(self):
options = PipelineOptions(
['--profiler_agent=memray', '--temp_location=gs://bucket/temp'])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])
self.assertEqual(
options.view_as(ProfilingOptions).profile_location,
'gs://bucket/temp/profiles')

options = PipelineOptions([
'--profiler_agent=memray',
'--temp_location=gs://bucket/temp',
'--profile_location=gs://other-bucket/custom_profiles'
])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])
self.assertEqual(
options.view_as(ProfilingOptions).profile_location,
'gs://other-bucket/custom_profiles')

options = PipelineOptions([
'--profiler_agent=memray',
'--temp_location=gs://bucket/temp',
'--profile_location=nOnE'
])
validator = PipelineOptionsValidator(options, None)
self.assertEqual(validator.validate(), [])
self.assertIsNone(options.view_as(ProfilingOptions).profile_location)

def test_add_experiment(self):
options = PipelineOptions([])
options.view_as(DebugOptions).add_experiment('new_experiment')
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TestOptions
Expand All @@ -55,6 +56,7 @@ class PipelineOptionsValidator(object):
DebugOptions,
GoogleCloudOptions,
PortableOptions,
ProfilingOptions,
SetupOptions,
StandardOptions,
TestOptions,
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ RUN \
ccache \
# Required for using Beam Python SDK on ARM machines.
libgeos-dev \
# Required for memory profiling with tcmalloc.
google-perftools \
&& \

rm -rf /var/lib/apt/lists/* && \

pip install --upgrade pip setuptools wheel && \
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/container/base_image_requirements_manual.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ google-cloud-profiler;python_version<="3.12"
# tests
google-api-python-client;python_version<="3.13"
guppy3
# Explicitly pin memray to use a version compatible with postprocessing logic in Beam.
memray==1.19.3
mmh3 # Optimizes execution of some Beam codepaths. TODO: Make it Beam's dependency.
nltk # Commonly used for natural language processing.
google-crc32c
Expand Down
90 changes: 81 additions & 9 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -133,7 +134,17 @@ type PipelineOptionsData struct {
}

type OptionsData struct {
Experiments []string `json:"experiments"`
Experiments []string `json:"experiments"`
ProfilerAgent string `json:"profiler_agent"`
ProfilerExtraArgs []string `json:"profiler_extra_args"`
ProfilerExtraEnvVars []string `json:"profiler_extra_env_vars"`
ProfileLocation string `json:"profile_location"`
ProfileTempLocation string `json:"profile_temp_location"`
ProfileUploadIntervalSec int `json:"profile_upload_interval_sec"`
ProfilerStopAfterSec int `json:"profiler_stop_after_sec"`
ProfilerStopAfterCrash bool `json:"profiler_stop_after_crash"`
ProfilePostprocessIntervalSec int `json:"profile_postprocess_interval_sec"`
JobId string `json:"jobId,omitempty"`
}

func getExperiments(options string) []string {
Expand Down Expand Up @@ -198,6 +209,14 @@ func launchSDKProcess() error {
logger.Printf(ctx, "Build isolation disabled when installing packages with pip")
}

var opts PipelineOptionsData
if err := json.Unmarshal([]byte(options), &opts); err != nil {
logger.Warnf(ctx, "Failed to unmarshal pipeline options for profiling config: %v", err)
}

ctx = setupProfilerConfig(ctx, logger, &opts)
startProfilerBackgroundTasks(ctx, logger)

// (2) Retrieve and install the staged packages.
//
// No log.Fatalf() from here on, otherwise deferred cleanups will not be called!
Expand Down Expand Up @@ -314,11 +333,6 @@ func launchSDKProcess() error {
childPids.mu.Unlock()
}()

args := []string{
"-m",
sdkHarnessEntrypoint,
}

var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
Expand All @@ -333,12 +347,68 @@ func launchSDKProcess() error {
childPids.mu.Unlock()
return
}
logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " "))
cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...)

currentProg := "python"
currentArgs := []string{"-m", sdkHarnessEntrypoint}
currentEnv := map[string]string{"WORKER_ID": workerId}

profilingActive := false
currentProg, currentArgs, currentEnv, profilingActive = maybeWithProfiler(
ctx, logger, workerId, currentProg, currentArgs, currentEnv,
)

var envStr string
if len(currentEnv) > 0 {
var envStrings []string
for k, v := range currentEnv {
envStrings = append(envStrings, k+"="+v)
}
slices.Sort(envStrings)
envStr = strings.Join(envStrings, ", ")
}

logger.Printf(ctx, "Executing Python (%v): %v %v", envStr, currentProg, strings.Join(currentArgs, " "))
cmd := StartCommandEnv(currentEnv, os.Stdin, bufLogger, bufLogger, currentProg, currentArgs...)
childPids.v = append(childPids.v, cmd.Process.Pid)
childPids.mu.Unlock()

if err := cmd.Wait(); err != nil {
var timer *time.Timer
var profilingTimedOut atomic.Bool

pcfg := getProfilerConfig(ctx)
if profilingActive && pcfg != nil && pcfg.StopAfterSec > 0 {
duration := time.Duration(pcfg.StopAfterSec) * time.Second
timer = time.AfterFunc(duration, func() {
childPids.mu.Lock()
defer childPids.mu.Unlock()
if cmd.Process != nil {
logger.Printf(ctx, "Profiling timeout of %d seconds reached. Sending SIGINT to worker %s",
pcfg.StopAfterSec, workerId)
profilingTimedOut.Store(true)
syscall.Kill(-cmd.Process.Pid, syscall.SIGINT)
Comment thread
tvalentyn marked this conversation as resolved.
}
})
}

err := cmd.Wait()
if timer != nil {
timer.Stop()
}

if err != nil {
if profilingTimedOut.Load() {
stopProfiling(ctx)
bufLogger.FlushAtDebug(ctx)
logger.Printf(ctx, "Python worker %v terminated after profiling timeout. Restarting without profiler.", workerId)
// Error is not counted toward error budget.
continue
}

if profilingActive && pcfg != nil && pcfg.StopAfterCrash {
stopProfiling(ctx)
logger.Printf(ctx, "Python worker %v crashed. Disabling profiler on subsequent restarts because --profiler_stop_after_crash is enabled.", workerId)
}

// Retry on fatal errors, like OOMs and segfaults, not just
// DoFns throwing exceptions.
errorCount += 1
Expand Down Expand Up @@ -532,3 +602,5 @@ func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.Buffered
bufLogger.Printf(ctx, "%s", string(content))
return nil
}


Loading
Loading