apache_beam.io.source_test_utils module
Helper functions and test harnesses for source implementations.
This module contains helper functions and test harnesses for checking
correctness of source (a subclass of iobase.BoundedSource
) and range
tracker (a subclass of``iobase.RangeTracker``) implementations.
Contains a few lightweight utilities (e.g. reading items from a source such as
readFromSource()
, as well as heavyweight property testing and stress
testing harnesses that help getting a large amount of test coverage with few
code.
Most notable ones are:
* assertSourcesEqualReferenceSource()
helps testing that the data read by
the union of sources produced by BoundedSource.split()
is the same as data
read by the original source.
* If your source implements dynamic work rebalancing, use the
assertSplitAtFraction()
family of functions - they test behavior of
RangeTracker.try_split()
, in particular, that various consistency
properties are respected and the total set of data read by the source is
preserved when splits happen. Use assertSplitAtFractionBehavior()
to test
individual cases of RangeTracker.try_split()
and use
assertSplitAtFractionExhaustive()
as a heavy-weight stress test including
concurrency. We strongly recommend to use both.
- For example usages, see the unit tests of modules such as
apache_beam.io.source_test_utils_test.py
apache_beam.io.avroio_test.py
- apache_beam.io.source_test_utils.read_from_source(source, start_position=None, stop_position=None)[source]
Reads elements from the given
`BoundedSource`
.Only reads elements within the given position range. :param source:
BoundedSource
implementation. :type source: ~apache_beam.io.iobase.BoundedSource :param start_position: start position for reading. :type start_position: int :param stop_position: stop position for reading. :type stop_position: int- Returns:
the set of values read from the sources.
- Return type:
List[str]
- apache_beam.io.source_test_utils.assert_sources_equal_reference_source(reference_source_info, sources_info)[source]
Tests if a reference source is equal to a given set of sources.
Given a reference source (a
BoundedSource
and a position range) and a list of sources, assert that the union of the records read from the list of sources is equal to the records read from the reference source.- Parameters:
reference_source_info (Tuple[BoundedSource, int, int]) – a three-tuple that gives the reference
BoundedSource
, position to start reading at, and position to stop reading at.sources_info (Iterable[Tuple[BoundedSource, int, int]]) – a set of sources. Each source is a three-tuple that is of the same format described above.
- Raises:
ValueError – if the set of data produced by the reference source and the given set of sources are not equivalent.
- apache_beam.io.source_test_utils.assert_reentrant_reads_succeed(source_info)[source]
Tests if a given source can be read in a reentrant manner.
Assume that given source produces the set of values
{v1, v2, v3, ... vn}
. Fori
in range[1, n-1]
this method performs a reentrant read after readingi
elements and verifies that both the original and reentrant read produce the expected set of values.- Parameters:
source_info (Tuple[BoundedSource, int, int]) – a three-tuple that gives the reference
BoundedSource
, position to start reading at, and a position to stop reading at.- Raises:
ValueError – if source is too trivial or reentrant read result in an incorrect read.
- apache_beam.io.source_test_utils.assert_split_at_fraction_behavior(source, num_items_to_read_before_split, split_fraction, expected_outcome)[source]
Verifies the behaviour of splitting a source at a given fraction.
Asserts that splitting a
BoundedSource
either fails after reading num_items_to_read_before_split items, or succeeds in a way that is consistent according toassert_split_at_fraction_succeeds_and_consistent()
.- Parameters:
source (BoundedSource) – the source to perform dynamic splitting on.
num_items_to_read_before_split (int) – number of items to read before splitting.
split_fraction (float) – fraction to split at.
expected_outcome (int) – a value from
ExpectedSplitOutcome
.
- Returns:
a tuple that gives the number of items produced by reading the two ranges produced after dynamic splitting. If splitting did not occur, the first value of the tuple will represent the full set of records read by the source while the second value of the tuple will be
-1
.- Return type:
- apache_beam.io.source_test_utils.assert_split_at_fraction_binary(source, expected_items, num_items_to_read_before_split, left_fraction, left_result, right_fraction, right_result, stats, start_position=None, stop_position=None)[source]
Performs dynamic work rebalancing for fractions within a given range.
Asserts that given a start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds.
- Parameters:
source – source to perform dynamic splitting on.
expected_items – total set of items expected when reading the source.
num_items_to_read_before_split – number of items to read before splitting.
left_fraction – left fraction for binary splitting.
left_result – result received by splitting at left fraction.
right_fraction – right fraction for binary splitting.
right_result – result received by splitting at right fraction.
stats – a
SplitFractionStatistics
for storing results.
- apache_beam.io.source_test_utils.assert_split_at_fraction_exhaustive(source, start_position=None, stop_position=None, perform_multi_threaded_test=True)[source]
Performs and tests dynamic work rebalancing exhaustively.
Asserts that for each possible start position, a source can be split at every interesting fraction (halfway between two fractions that differ by at least one item) and the results are consistent if a split succeeds. Verifies multi threaded splitting as well.
- Parameters:
source (BoundedSource) – the source to perform dynamic splitting on.
perform_multi_threaded_test (bool) – if
True
performs a multi-threaded test, otherwise this test is skipped.
- Raises:
ValueError – if the exhaustive splitting test fails.
- apache_beam.io.source_test_utils.assert_split_at_fraction_fails(source, num_items_to_read_before_split, split_fraction)[source]
Asserts that dynamic work rebalancing at a given fraction fails.
Asserts that trying to perform dynamic splitting after reading ‘num_items_to_read_before_split’ items from the source fails.
- Parameters:
source – source to perform dynamic splitting on.
num_items_to_read_before_split – number of items to read before splitting.
split_fraction – fraction to split at.
- apache_beam.io.source_test_utils.assert_split_at_fraction_succeeds_and_consistent(source, num_items_to_read_before_split, split_fraction)[source]
Verifies some consistency properties of dynamic work rebalancing.
Equivalent to the following pseudocode::
original_range_tracker = source.getRangeTracker(None, None) original_reader = source.read(original_range_tracker) items_before_split = read N items from original_reader suggested_split_position = original_range_tracker.position_for_fraction( split_fraction) original_stop_position - original_range_tracker.stop_position() split_result = range_tracker.try_split() split_position, split_fraction = split_result primary_range_tracker = source.get_range_tracker( original_range_tracker.start_position(), split_position) residual_range_tracker = source.get_range_tracker(split_position, original_stop_position) assert that: items when reading source.read(primary_range_tracker) == items_before_split + items from continuing to read 'original_reader' assert that: items when reading source.read(original_range_tracker) = items when reading source.read(primary_range_tracker) + items when reading source.read(residual_range_tracker)
- Parameters:
source – source to perform dynamic work rebalancing on.
num_items_to_read_before_split – number of items to read before splitting.
split_fraction – fraction to split at.