Internal change

PiperOrigin-RevId: 492253867
This commit is contained in:
Hadon Nash 2022-12-01 11:33:15 -08:00 committed by Copybara-Service
parent 02aa162c9e
commit 1e2cb2b359
2 changed files with 40 additions and 1 deletions

View File

@ -354,7 +354,9 @@ NodeReadiness SyncSet::GetReadiness(Timestamp* min_stream_timestamp) {
} }
} }
*min_stream_timestamp = std::min(min_packet, min_bound); *min_stream_timestamp = std::min(min_packet, min_bound);
if (*min_stream_timestamp == Timestamp::Done()) { if (*min_stream_timestamp >= Timestamp::OneOverPostStream()) {
// Either OneOverPostStream or Done indicates no more packets.
*min_stream_timestamp = Timestamp::Done();
last_processed_ts_ = Timestamp::Done().PreviousAllowedInStream(); last_processed_ts_ = Timestamp::Done().PreviousAllowedInStream();
return NodeReadiness::kReadyForClose; return NodeReadiness::kReadyForClose;
} }

View File

@ -230,6 +230,43 @@ TEST_F(ImmediateInputStreamHandlerTest, StreamDoneReady) {
input_stream_handler_->ClearCurrentInputs(cc_); input_stream_handler_->ClearCurrentInputs(cc_);
} }
// This test checks that the state is ReadyForClose after all streams reach
// Timestamp::Max.
TEST_F(ImmediateInputStreamHandlerTest, ReadyForCloseAfterTimestampMax) {
Timestamp min_stream_timestamp;
std::list<Packet> packets;
// One packet arrives, ready for process.
packets.push_back(Adopt(new std::string("packet 1")).At(Timestamp(10)));
input_stream_handler_->AddPackets(name_to_id_["input_a"], packets);
EXPECT_TRUE(input_stream_handler_->ScheduleInvocations(
/*max_allowance=*/1, &min_stream_timestamp));
EXPECT_EQ(Timestamp(10), cc_->InputTimestamp());
input_stream_handler_->FinalizeInputSet(cc_->InputTimestamp(),
&cc_->Inputs());
input_stream_handler_->ClearCurrentInputs(cc_);
// No packets arrive, not ready.
EXPECT_FALSE(input_stream_handler_->ScheduleInvocations(
/*max_allowance=*/1, &min_stream_timestamp));
EXPECT_EQ(Timestamp::Unset(), cc_->InputTimestamp());
// Timestamp::Max arrives, ready for close.
input_stream_handler_->SetNextTimestampBound(
name_to_id_["input_a"], Timestamp::Max().NextAllowedInStream());
input_stream_handler_->SetNextTimestampBound(
name_to_id_["input_b"], Timestamp::Max().NextAllowedInStream());
input_stream_handler_->SetNextTimestampBound(
name_to_id_["input_c"], Timestamp::Max().NextAllowedInStream());
EXPECT_TRUE(input_stream_handler_->ScheduleInvocations(
/*max_allowance=*/1, &min_stream_timestamp));
EXPECT_EQ(Timestamp::Done(), cc_->InputTimestamp());
input_stream_handler_->FinalizeInputSet(cc_->InputTimestamp(),
&cc_->Inputs());
input_stream_handler_->ClearCurrentInputs(cc_);
}
// This test checks that when any stream is done, the state is ready to close. // This test checks that when any stream is done, the state is ready to close.
TEST_F(ImmediateInputStreamHandlerTest, ReadyForClose) { TEST_F(ImmediateInputStreamHandlerTest, ReadyForClose) {
Timestamp min_stream_timestamp; Timestamp min_stream_timestamp;