Commit ccd2d616 authored by ttingen's avatar ttingen

Fix streams consumer group tests.

parent 15b8dac8
...@@ -243,7 +243,7 @@ public void StreamConsumerGroupReadFromStreamBeginning() ...@@ -243,7 +243,7 @@ public void StreamConsumerGroupReadFromStreamBeginning()
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
var entries = db.StreamReadGroup(key, groupName, "test_consumer", "0-0"); var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages);
Assert.Equal(2, entries.Length); Assert.Equal(2, entries.Length);
Assert.True(id1 == entries[0].Id); Assert.True(id1 == entries[0].Id);
...@@ -274,7 +274,7 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount() ...@@ -274,7 +274,7 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages, 2); var entries = db.StreamReadGroup(key, groupName, "test_consumer", StreamPosition.NewMessages, 2);
// Ensure we only received the requested count and that the IDs match the expected values. // Ensure we only received the requested count and that the IDs match the expected values.
Assert.Single(entries); Assert.Equal(2, entries.Length);
Assert.True(id2 == entries[0].Id); Assert.True(id2 == entries[0].Id);
Assert.True(id3 == entries[1].Id); Assert.True(id3 == entries[1].Id);
} }
...@@ -301,7 +301,7 @@ public void StreamConsumerGroupAcknowledgeMessage() ...@@ -301,7 +301,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read all 4 messages, they will be assigned to the consumer // Read all 4 messages, they will be assigned to the consumer
var entries = db.StreamReadGroup(key, groupName, consumer, "0-0"); var entries = db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages);
// Send XACK for 3 of the messages // Send XACK for 3 of the messages
...@@ -395,7 +395,7 @@ public void StreamConsumerGroupClaimMessagesReturningIds() ...@@ -395,7 +395,7 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.Beginning, 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.NewMessages, 1);
// Read the remaining messages into the second consumer. // Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
...@@ -449,7 +449,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew() ...@@ -449,7 +449,7 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
db.StreamAdd(stream2, "field2-3", "value2-3"); db.StreamAdd(stream2, "field2-3", "value2-3");
// stream1 set up to read only new messages. // stream1 set up to read only new messages.
db.StreamCreateConsumerGroup(stream1, groupName); db.StreamCreateConsumerGroup(stream1, groupName, StreamPosition.NewMessages);
// stream2 set up to read from the beginning of the stream // stream2 set up to read from the beginning of the stream
db.StreamCreateConsumerGroup(stream2, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(stream2, groupName, StreamPosition.Beginning);
...@@ -457,16 +457,17 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew() ...@@ -457,16 +457,17 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
// Read for both streams from the beginning. We shouldn't get anything back for stream1. // Read for both streams from the beginning. We shouldn't get anything back for stream1.
var pairs = new StreamPosition[] var pairs = new StreamPosition[]
{ {
new StreamPosition(stream1, StreamPosition.Beginning), // StreamPosition.NewMessages will send ">" which indicates "Undelivered" messages.
new StreamPosition(stream2, StreamPosition.Beginning) new StreamPosition(stream1, StreamPosition.NewMessages),
new StreamPosition(stream2, StreamPosition.NewMessages)
}; };
var streams = db.StreamReadGroup(pairs, groupName, "test_consumer"); var streams = db.StreamReadGroup(pairs, groupName, "test_consumer");
Assert.NotNull(streams); Assert.NotNull(streams);
Assert.Equal(2, streams.Length); Assert.Single(streams);
Assert.Empty(streams[0].Entries); Assert.Equal(stream2, streams[0].Key);
Assert.Equal(3, streams[1].Entries.Length); Assert.Equal(3, streams[0].Entries.Length);
} }
} }
...@@ -568,15 +569,15 @@ public void StreamConsumerGroupReadMultipleRestrictCount() ...@@ -568,15 +569,15 @@ public void StreamConsumerGroupReadMultipleRestrictCount()
var id2_2 = db.StreamAdd(stream2, "field2-2", "value2-2"); var id2_2 = db.StreamAdd(stream2, "field2-2", "value2-2");
var id2_3 = db.StreamAdd(stream2, "field2-3", "value2-3"); var id2_3 = db.StreamAdd(stream2, "field2-3", "value2-3");
// Allow reading from the beginning in both streams // Set the initial read point in each stream, *after* the first ID in both streams.
db.StreamCreateConsumerGroup(stream1, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(stream1, groupName, id1_1);
db.StreamCreateConsumerGroup(stream2, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(stream2, groupName, id2_1);
var pairs = new StreamPosition[] var pairs = new StreamPosition[]
{ {
// Read after the first id in both streams // Read after the first id in both streams
new StreamPosition(stream1, id1_1), new StreamPosition(stream1, StreamPosition.NewMessages),
new StreamPosition(stream2, id2_1) new StreamPosition(stream2, StreamPosition.NewMessages)
}; };
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second). // Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
...@@ -664,7 +665,7 @@ public void StreamConsumerGroupViewPendingInfoSummary() ...@@ -664,7 +665,7 @@ public void StreamConsumerGroupViewPendingInfoSummary()
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
// Read a single message into the first consumer. // Read a single message into the first consumer.
var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.Beginning, 1); var consumer1Messages = db.StreamReadGroup(key, groupName, consumer1, StreamPosition.NewMessages, 1);
// Read the remaining messages into the second consumer. // Read the remaining messages into the second consumer.
var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2); var consumer2Messages = db.StreamReadGroup(key, groupName, consumer2);
...@@ -782,7 +783,7 @@ public void StreamDeleteConsumer() ...@@ -782,7 +783,7 @@ public void StreamDeleteConsumer()
// Create a consumer group and read the message. // Create a consumer group and read the message.
db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning); db.StreamCreateConsumerGroup(key, groupName, StreamPosition.Beginning);
db.StreamReadGroup(key, groupName, consumer, StreamPosition.Beginning); db.StreamReadGroup(key, groupName, consumer, StreamPosition.NewMessages);
var preDeleteConsumers = db.StreamConsumerInfo(key, groupName); var preDeleteConsumers = db.StreamConsumerInfo(key, groupName);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment