Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
S
StackExchange.Redis
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tsai
StackExchange.Redis
Commits
4054ad34
Commit
4054ad34
authored
Aug 09, 2018
by
Marc Gravell
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' into pipelines
parents
3dc56ba1
a4d087f6
Changes
10
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
161 additions
and
190 deletions
+161
-190
DatabaseWrapperTests.cs
StackExchange.Redis.Tests/DatabaseWrapperTests.cs
+8
-8
Streams.cs
StackExchange.Redis.Tests/Streams.cs
+72
-72
WrapperBaseTests.cs
StackExchange.Redis.Tests/WrapperBaseTests.cs
+8
-8
IDatabase.cs
...xchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs
+6
-6
IDatabaseAsync.cs
...ge.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs
+6
-6
DatabaseWrapper.cs
.../StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs
+4
-4
WrapperBase.cs
...edis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs
+4
-4
Position.cs
StackExchange.Redis/StackExchange/Redis/Position.cs
+0
-57
RedisDatabase.cs
StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs
+22
-22
StreamPosition.cs
StackExchange.Redis/StackExchange/Redis/StreamPosition.cs
+31
-3
No files found.
StackExchange.Redis.Tests/DatabaseWrapperTests.cs
View file @
4054ad34
...
@@ -841,8 +841,8 @@ public void StreamClaimMessagesReturningIds()
...
@@ -841,8 +841,8 @@ public void StreamClaimMessagesReturningIds()
[
Fact
]
[
Fact
]
public
void
StreamConsumerGroupSetPosition
()
public
void
StreamConsumerGroupSetPosition
()
{
{
wrapper
.
StreamConsumerGroupSetPosition
(
"key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamConsumerGroupSetPosition
(
"key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamConsumerGroupSetPosition
(
"prefix:key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamConsumerGroupSetPosition
(
"prefix:key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
...
@@ -855,8 +855,8 @@ public void StreamConsumerInfoGet()
...
@@ -855,8 +855,8 @@ public void StreamConsumerInfoGet()
[
Fact
]
[
Fact
]
public
void
StreamCreateConsumerGroup
()
public
void
StreamCreateConsumerGroup
()
{
{
wrapper
.
StreamCreateConsumerGroup
(
"key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamCreateConsumerGroup
(
"key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamCreateConsumerGroup
(
"prefix:key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamCreateConsumerGroup
(
"prefix:key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
...
@@ -934,15 +934,15 @@ public void StreamRead_1()
...
@@ -934,15 +934,15 @@ public void StreamRead_1()
[
Fact
]
[
Fact
]
public
void
StreamRead_2
()
public
void
StreamRead_2
()
{
{
wrapper
.
StreamRead
(
"key"
,
new
Position
(
"0-0"
)
,
null
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamRead
(
"key"
,
"0-0"
,
null
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamRead
(
"prefix:key"
,
new
Position
(
"0-0"
)
,
null
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamRead
(
"prefix:key"
,
"0-0"
,
null
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamStreamReadGroup_1
()
public
void
StreamStreamReadGroup_1
()
{
{
wrapper
.
StreamReadGroup
(
"key"
,
"group"
,
"consumer"
,
new
Position
(
"0-0"
)
,
10
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamReadGroup
(
"key"
,
"group"
,
"consumer"
,
"0-0"
,
10
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamReadGroup
(
"prefix:key"
,
"group"
,
"consumer"
,
new
Position
(
"0-0"
)
,
10
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamReadGroup
(
"prefix:key"
,
"group"
,
"consumer"
,
"0-0"
,
10
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
...
...
StackExchange.Redis.Tests/Streams.cs
View file @
4054ad34
...
@@ -135,15 +135,15 @@ public void StreamConsumerGroupSetId()
...
@@ -135,15 +135,15 @@ public void StreamConsumerGroupSetId()
db
.
StreamAdd
(
key
,
"field2"
,
"value2"
);
db
.
StreamAdd
(
key
,
"field2"
,
"value2"
);
// Create a group and set the position to deliver new messages only.
// Create a group and set the position to deliver new messages only.
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
New
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
StreamPosition
.
NewMessages
);
// Read into the group, expect nothing
// Read into the group, expect nothing
var
firstRead
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Position
.
New
);
var
firstRead
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
StreamPosition
.
NewMessages
);
// Reset the ID back to read from the beginning.
// Reset the ID back to read from the beginning.
db
.
StreamConsumerGroupSetPosition
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamConsumerGroupSetPosition
(
key
,
groupName
,
Stream
Position
.
Beginning
);
var
secondRead
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Position
.
New
);
var
secondRead
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
StreamPosition
.
NewMessages
);
Assert
.
NotNull
(
firstRead
);
Assert
.
NotNull
(
firstRead
);
Assert
.
NotNull
(
secondRead
);
Assert
.
NotNull
(
secondRead
);
...
@@ -168,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers()
...
@@ -168,7 +168,7 @@ public void StreamConsumerGroupWithNoConsumers()
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
// Create a group
// Create a group
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
new
Position
(
"0-0"
)
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
"0-0"
);
// Query redis for the group consumers, expect an empty list in response.
// Query redis for the group consumers, expect an empty list in response.
var
consumers
=
db
.
StreamConsumerInfo
(
key
,
groupName
);
var
consumers
=
db
.
StreamConsumerInfo
(
key
,
groupName
);
...
@@ -193,7 +193,7 @@ public void StreamCreateConsumerGroup()
...
@@ -193,7 +193,7 @@ public void StreamCreateConsumerGroup()
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
// Create a group
// Create a group
var
result
=
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
var
result
=
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
Assert
.
True
(
result
);
Assert
.
True
(
result
);
}
}
...
@@ -219,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
...
@@ -219,7 +219,7 @@ public void StreamConsumerGroupReadOnlyNewMessagesWithEmptyResponse()
db
.
StreamCreateConsumerGroup
(
key
,
groupName
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
);
// Read, expect no messages
// Read, expect no messages
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
new
Position
(
"0-0"
)
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
"0-0"
);
Assert
.
True
(
entries
.
Length
==
0
);
Assert
.
True
(
entries
.
Length
==
0
);
}
}
...
@@ -240,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning()
...
@@ -240,9 +240,9 @@ public void StreamConsumerGroupReadFromStreamBeginning()
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
var
id2
=
db
.
StreamAdd
(
key
,
"field2"
,
"value2"
);
var
id2
=
db
.
StreamAdd
(
key
,
"field2"
,
"value2"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
new
Position
(
"0-0"
)
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
"0-0"
);
Assert
.
True
(
entries
.
Length
==
2
);
Assert
.
True
(
entries
.
Length
==
2
);
Assert
.
True
(
id1
==
entries
[
0
].
Id
);
Assert
.
True
(
id1
==
entries
[
0
].
Id
);
...
@@ -268,9 +268,9 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
...
@@ -268,9 +268,9 @@ public void StreamConsumerGroupReadFromStreamBeginningWithCount()
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
// Start reading after id1.
// Start reading after id1.
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
new
Position
(
id1
)
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
id1
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
Position
.
New
,
2
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
"test_consumer"
,
StreamPosition
.
NewMessages
,
2
);
// Ensure we only received the requested count and that the IDs match the expected values.
// Ensure we only received the requested count and that the IDs match the expected values.
Assert
.
True
(
entries
.
Length
==
2
);
Assert
.
True
(
entries
.
Length
==
2
);
...
@@ -297,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage()
...
@@ -297,10 +297,10 @@ public void StreamConsumerGroupAcknowledgeMessage()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
// Read all 4 messages, they will be assigned to the consumer
// Read all 4 messages, they will be assigned to the consumer
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
new
Position
(
"0-0"
)
);
var
entries
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
"0-0"
);
// Send XACK for 3 of the messages
// Send XACK for 3 of the messages
...
@@ -311,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
...
@@ -311,7 +311,7 @@ public void StreamConsumerGroupAcknowledgeMessage()
var
twoAck
=
db
.
StreamAcknowledge
(
key
,
groupName
,
new
RedisValue
[]
{
id3
,
id4
});
var
twoAck
=
db
.
StreamAcknowledge
(
key
,
groupName
,
new
RedisValue
[]
{
id3
,
id4
});
// Read the group again, it should only return the unacknowledged message.
// Read the group again, it should only return the unacknowledged message.
var
notAcknowledged
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
new
Position
(
"0-0"
)
);
var
notAcknowledged
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
"0-0"
);
Assert
.
True
(
entries
.
Length
==
4
);
Assert
.
True
(
entries
.
Length
==
4
);
Assert
.
Equal
(
1
,
oneAck
);
Assert
.
Equal
(
1
,
oneAck
);
...
@@ -340,7 +340,7 @@ public void StreamConsumerGroupClaimMessages()
...
@@ -340,7 +340,7 @@ public void StreamConsumerGroupClaimMessages()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
new
Position
(
"0-0"
)
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
"0-0"
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
...
@@ -391,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
...
@@ -391,10 +391,10 @@ public void StreamConsumerGroupClaimMessagesReturningIds()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
Position
.
Beginning
,
1
);
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
Stream
Position
.
Beginning
,
1
);
// Read the remaining messages into the second consumer.
// Read the remaining messages into the second consumer.
var
consumer2Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer2
);
var
consumer2Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer2
);
...
@@ -451,13 +451,13 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
...
@@ -451,13 +451,13 @@ public void StreamConsumerGroupReadMultipleOneReadBeginningOneReadNew()
db
.
StreamCreateConsumerGroup
(
stream1
,
groupName
);
db
.
StreamCreateConsumerGroup
(
stream1
,
groupName
);
// stream2 set up to read from the beginning of the stream
// stream2 set up to read from the beginning of the stream
db
.
StreamCreateConsumerGroup
(
stream2
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
stream2
,
groupName
,
Stream
Position
.
Beginning
);
// Read for both streams from the beginning. We shouldn't get anything back for stream1.
// Read for both streams from the beginning. We shouldn't get anything back for stream1.
var
pairs
=
new
StreamPosition
[]
var
pairs
=
new
StreamPosition
[]
{
{
new
StreamPosition
(
stream1
,
Position
.
Beginning
),
new
StreamPosition
(
stream1
,
Stream
Position
.
Beginning
),
new
StreamPosition
(
stream2
,
Position
.
Beginning
)
new
StreamPosition
(
stream2
,
Stream
Position
.
Beginning
)
};
};
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
...
@@ -492,8 +492,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
...
@@ -492,8 +492,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpectNoResult()
// We shouldn't get anything for either stream.
// We shouldn't get anything for either stream.
var
pairs
=
new
StreamPosition
[]
var
pairs
=
new
StreamPosition
[]
{
{
new
StreamPosition
(
stream1
,
Position
.
Beginning
),
new
StreamPosition
(
stream1
,
Stream
Position
.
Beginning
),
new
StreamPosition
(
stream2
,
Position
.
Beginning
)
new
StreamPosition
(
stream2
,
Stream
Position
.
Beginning
)
};
};
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
...
@@ -533,8 +533,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
...
@@ -533,8 +533,8 @@ public void StreamConsumerGroupReadMultipleOnlyNewMessagesExpect1Result()
// Read the new messages (messages created after the group was created).
// Read the new messages (messages created after the group was created).
var
pairs
=
new
StreamPosition
[]
var
pairs
=
new
StreamPosition
[]
{
{
new
StreamPosition
(
stream1
,
Position
.
New
),
new
StreamPosition
(
stream1
,
StreamPosition
.
NewMessages
),
new
StreamPosition
(
stream2
,
Position
.
New
)
new
StreamPosition
(
stream2
,
StreamPosition
.
NewMessages
)
};
};
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
var
streams
=
db
.
StreamReadGroup
(
pairs
,
groupName
,
"test_consumer"
);
...
@@ -569,14 +569,14 @@ public void StreamConsumerGroupReadMultipleRestrictCount()
...
@@ -569,14 +569,14 @@ public void StreamConsumerGroupReadMultipleRestrictCount()
var
id2_3
=
db
.
StreamAdd
(
stream2
,
"field2-3"
,
"value2-3"
);
var
id2_3
=
db
.
StreamAdd
(
stream2
,
"field2-3"
,
"value2-3"
);
// Allow reading from the beginning in both streams
// Allow reading from the beginning in both streams
db
.
StreamCreateConsumerGroup
(
stream1
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
stream1
,
groupName
,
Stream
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
stream2
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
stream2
,
groupName
,
Stream
Position
.
Beginning
);
var
pairs
=
new
StreamPosition
[]
var
pairs
=
new
StreamPosition
[]
{
{
// Read after the first id in both streams
// Read after the first id in both streams
new
StreamPosition
(
stream1
,
new
Position
(
id1_1
)
),
new
StreamPosition
(
stream1
,
id1_1
),
new
StreamPosition
(
stream2
,
new
Position
(
id2_1
)
)
new
StreamPosition
(
stream2
,
id2_1
)
};
};
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
// Restrict the count to 2 (expect only 1 message from first stream, 2 from the second).
...
@@ -604,7 +604,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
...
@@ -604,7 +604,7 @@ public void StreamConsumerGroupViewPendingInfoNoConsumers()
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
var
pendingInfo
=
db
.
StreamPending
(
key
,
groupName
);
var
pendingInfo
=
db
.
StreamPending
(
key
,
groupName
);
...
@@ -630,7 +630,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
...
@@ -630,7 +630,7 @@ public void StreamConsumerGroupViewPendingInfoWhenNothingPending()
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
var
id1
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
new
Position
(
"0-0"
)
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
"0-0"
);
var
pendingMessages
=
db
.
StreamPendingMessages
(
key
,
var
pendingMessages
=
db
.
StreamPendingMessages
(
key
,
groupName
,
groupName
,
...
@@ -661,10 +661,10 @@ public void StreamConsumerGroupViewPendingInfoSummary()
...
@@ -661,10 +661,10 @@ public void StreamConsumerGroupViewPendingInfoSummary()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
Position
.
Beginning
,
1
);
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
Stream
Position
.
Beginning
,
1
);
// Read the remaining messages into the second consumer.
// Read the remaining messages into the second consumer.
var
consumer2Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer2
);
var
consumer2Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer2
);
...
@@ -703,7 +703,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
...
@@ -703,7 +703,7 @@ public async Task StreamConsumerGroupViewPendingMessageInfo()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
...
@@ -744,7 +744,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
...
@@ -744,7 +744,7 @@ public void StreamConsumerGroupViewPendingMessageInfoForConsumer()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
groupName
,
consumer1
,
count
:
1
);
...
@@ -781,8 +781,8 @@ public void StreamDeleteConsumer()
...
@@ -781,8 +781,8 @@ public void StreamDeleteConsumer()
db
.
StreamAdd
(
key
,
"fiedl2"
,
"value2"
);
db
.
StreamAdd
(
key
,
"fiedl2"
,
"value2"
);
// Create a consumer group and read the message.
// Create a consumer group and read the message.
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Position
.
Beginning
);
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Stream
Position
.
Beginning
);
var
preDeleteConsumers
=
db
.
StreamConsumerInfo
(
key
,
groupName
);
var
preDeleteConsumers
=
db
.
StreamConsumerInfo
(
key
,
groupName
);
...
@@ -815,8 +815,8 @@ public void StreamDeleteConsumerGroup()
...
@@ -815,8 +815,8 @@ public void StreamDeleteConsumerGroup()
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
// Create a consumer group and read the messages.
// Create a consumer group and read the messages.
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
Stream
Position
.
Beginning
);
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Position
.
Beginning
);
db
.
StreamReadGroup
(
key
,
groupName
,
consumer
,
Stream
Position
.
Beginning
);
var
preDeleteInfo
=
db
.
StreamInfo
(
key
);
var
preDeleteInfo
=
db
.
StreamInfo
(
key
);
...
@@ -899,8 +899,8 @@ public void StreamGroupInfoGet()
...
@@ -899,8 +899,8 @@ public void StreamGroupInfoGet()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
group1
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
group1
,
Stream
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
group2
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
group2
,
Stream
Position
.
Beginning
);
// Read a single message into the first consumer.
// Read a single message into the first consumer.
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
group1
,
consumer1
,
count
:
1
);
var
consumer1Messages
=
db
.
StreamReadGroup
(
key
,
group1
,
consumer1
,
count
:
1
);
...
@@ -940,7 +940,7 @@ public void StreamGroupConsumerInfoGet()
...
@@ -940,7 +940,7 @@ public void StreamGroupConsumerInfoGet()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
db
.
StreamCreateConsumerGroup
(
key
,
group
,
Position
.
Beginning
);
db
.
StreamCreateConsumerGroup
(
key
,
group
,
Stream
Position
.
Beginning
);
db
.
StreamReadGroup
(
key
,
group
,
consumer1
,
count
:
1
);
db
.
StreamReadGroup
(
key
,
group
,
consumer1
,
count
:
1
);
db
.
StreamReadGroup
(
key
,
group
,
consumer2
);
db
.
StreamReadGroup
(
key
,
group
,
consumer2
);
...
@@ -1045,7 +1045,7 @@ public void StreamPendingNoMessagesOrConsumers()
...
@@ -1045,7 +1045,7 @@ public void StreamPendingNoMessagesOrConsumers()
var
id
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
var
id
=
db
.
StreamAdd
(
key
,
"field1"
,
"value1"
);
db
.
StreamDelete
(
key
,
new
RedisValue
[]
{
id
});
db
.
StreamDelete
(
key
,
new
RedisValue
[]
{
id
});
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
new
Position
(
"0-0"
)
);
db
.
StreamCreateConsumerGroup
(
key
,
groupName
,
"0-0"
);
var
pendingInfo
=
db
.
StreamPending
(
key
,
"test_group"
);
var
pendingInfo
=
db
.
StreamPending
(
key
,
"test_group"
);
...
@@ -1060,38 +1060,38 @@ public void StreamPendingNoMessagesOrConsumers()
...
@@ -1060,38 +1060,38 @@ public void StreamPendingNoMessagesOrConsumers()
[
Fact
]
[
Fact
]
public
void
StreamPositionDefaultValueIsBeginning
()
public
void
StreamPositionDefaultValueIsBeginning
()
{
{
Position
position
=
default
(
Position
)
;
RedisValue
position
=
StreamPosition
.
Beginning
;
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
position
.
ResolveForCommand
(
RedisCommand
.
XREADGROUP
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREADGROUP
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
position
.
ResolveForCommand
(
RedisCommand
.
XGROUP
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XGROUP
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamPositionValidateBeginning
()
public
void
StreamPositionValidateBeginning
()
{
{
var
position
=
Position
.
Beginning
;
var
position
=
Stream
Position
.
Beginning
;
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
));
Assert
.
Equal
(
StreamConstants
.
ReadMinValue
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamPositionValidateExplicit
()
public
void
StreamPositionValidateExplicit
()
{
{
var
explicitValue
=
"1-0"
;
var
explicitValue
=
"1-0"
;
var
position
=
new
Position
(
explicitValue
)
;
var
position
=
explicitValue
;
Assert
.
Equal
(
explicitValue
,
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
));
Assert
.
Equal
(
explicitValue
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamPositionValidateNew
()
public
void
StreamPositionValidateNew
()
{
{
var
position
=
Position
.
New
;
var
position
=
StreamPosition
.
NewMessages
;
Assert
.
Equal
(
StreamConstants
.
NewMessages
,
position
.
ResolveForCommand
(
RedisCommand
.
XGROUP
));
Assert
.
Equal
(
StreamConstants
.
NewMessages
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XGROUP
));
Assert
.
Equal
(
StreamConstants
.
UndeliveredMessages
,
position
.
ResolveForCommand
(
RedisCommand
.
XREADGROUP
));
Assert
.
Equal
(
StreamConstants
.
UndeliveredMessages
,
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREADGROUP
));
Assert
.
ThrowsAny
<
InvalidOperationException
>(()
=>
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
));
Assert
.
ThrowsAny
<
InvalidOperationException
>(()
=>
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
));
}
}
[
Fact
]
[
Fact
]
...
@@ -1110,7 +1110,7 @@ public void StreamRead()
...
@@ -1110,7 +1110,7 @@ public void StreamRead()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
// Read the entire stream from the beginning.
// Read the entire stream from the beginning.
var
entries
=
db
.
StreamRead
(
key
,
new
Position
(
"0-0"
)
);
var
entries
=
db
.
StreamRead
(
key
,
"0-0"
);
Assert
.
True
(
entries
.
Length
==
3
);
Assert
.
True
(
entries
.
Length
==
3
);
Assert
.
Equal
(
id1
,
entries
[
0
].
Id
);
Assert
.
Equal
(
id1
,
entries
[
0
].
Id
);
...
@@ -1138,7 +1138,7 @@ public void StreamReadEmptyStream()
...
@@ -1138,7 +1138,7 @@ public void StreamReadEmptyStream()
var
len
=
db
.
StreamLength
(
key
);
var
len
=
db
.
StreamLength
(
key
);
// Read the entire stream from the beginning.
// Read the entire stream from the beginning.
var
entries
=
db
.
StreamRead
(
key
,
new
Position
(
"0-0"
)
);
var
entries
=
db
.
StreamRead
(
key
,
"0-0"
);
Assert
.
True
(
entries
.
Length
==
0
);
Assert
.
True
(
entries
.
Length
==
0
);
Assert
.
Equal
(
0
,
len
);
Assert
.
Equal
(
0
,
len
);
...
@@ -1169,8 +1169,8 @@ public void StreamReadEmptyStreams()
...
@@ -1169,8 +1169,8 @@ public void StreamReadEmptyStreams()
var
len2
=
db
.
StreamLength
(
key2
);
var
len2
=
db
.
StreamLength
(
key2
);
// Read the entire stream from the beginning.
// Read the entire stream from the beginning.
var
entries1
=
db
.
StreamRead
(
key1
,
new
Position
(
"0-0"
)
);
var
entries1
=
db
.
StreamRead
(
key1
,
"0-0"
);
var
entries2
=
db
.
StreamRead
(
key2
,
new
Position
(
"0-0"
)
);
var
entries2
=
db
.
StreamRead
(
key2
,
"0-0"
);
Assert
.
True
(
entries1
.
Length
==
0
);
Assert
.
True
(
entries1
.
Length
==
0
);
Assert
.
True
(
entries2
.
Length
==
0
);
Assert
.
True
(
entries2
.
Length
==
0
);
...
@@ -1189,8 +1189,8 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
...
@@ -1189,8 +1189,8 @@ public void StreamReadExpectedExceptionInvalidCountMultipleStream()
var
streamPositions
=
new
StreamPosition
[]
var
streamPositions
=
new
StreamPosition
[]
{
{
new
StreamPosition
(
"key1"
,
new
Position
(
"0-0"
)
),
new
StreamPosition
(
"key1"
,
"0-0"
),
new
StreamPosition
(
"key2"
,
new
Position
(
"0-0"
)
)
new
StreamPosition
(
"key2"
,
"0-0"
)
};
};
var
db
=
conn
.
GetDatabase
();
var
db
=
conn
.
GetDatabase
();
...
@@ -1208,7 +1208,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream()
...
@@ -1208,7 +1208,7 @@ public void StreamReadExpectedExceptionInvalidCountSingleStream()
Skip
.
IfMissingFeature
(
conn
,
nameof
(
RedisFeatures
.
Streams
),
r
=>
r
.
Streams
);
Skip
.
IfMissingFeature
(
conn
,
nameof
(
RedisFeatures
.
Streams
),
r
=>
r
.
Streams
);
var
db
=
conn
.
GetDatabase
();
var
db
=
conn
.
GetDatabase
();
Assert
.
Throws
<
ArgumentOutOfRangeException
>(()
=>
db
.
StreamRead
(
key
,
new
Position
(
"0-0"
)
,
0
));
Assert
.
Throws
<
ArgumentOutOfRangeException
>(()
=>
db
.
StreamRead
(
key
,
"0-0"
,
0
));
}
}
}
}
...
@@ -1259,8 +1259,8 @@ public void StreamReadMultipleStreams()
...
@@ -1259,8 +1259,8 @@ public void StreamReadMultipleStreams()
// Read from both streams at the same time.
// Read from both streams at the same time.
var
streamList
=
new
StreamPosition
[
2
]
var
streamList
=
new
StreamPosition
[
2
]
{
{
new
StreamPosition
(
key1
,
new
Position
(
"0-0"
)
),
new
StreamPosition
(
key1
,
"0-0"
),
new
StreamPosition
(
key2
,
new
Position
(
"0-0"
)
)
new
StreamPosition
(
key2
,
"0-0"
)
};
};
var
streams
=
db
.
StreamRead
(
streamList
);
var
streams
=
db
.
StreamRead
(
streamList
);
...
@@ -1298,8 +1298,8 @@ public void StreamReadMultipleStreamsWithCount()
...
@@ -1298,8 +1298,8 @@ public void StreamReadMultipleStreamsWithCount()
var
streamList
=
new
StreamPosition
[
2
]
var
streamList
=
new
StreamPosition
[
2
]
{
{
new
StreamPosition
(
key1
,
new
Position
(
"0-0"
)
),
new
StreamPosition
(
key1
,
"0-0"
),
new
StreamPosition
(
key2
,
new
Position
(
"0-0"
)
)
new
StreamPosition
(
key2
,
"0-0"
)
};
};
var
streams
=
db
.
StreamRead
(
streamList
,
countPerStream
:
1
);
var
streams
=
db
.
StreamRead
(
streamList
,
countPerStream
:
1
);
...
@@ -1336,10 +1336,10 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream()
...
@@ -1336,10 +1336,10 @@ public void StreamReadMultipleStreamsWithReadPastSecondStream()
var
streamList
=
new
StreamPosition
[]
var
streamList
=
new
StreamPosition
[]
{
{
new
StreamPosition
(
key1
,
new
Position
(
"0-0"
)
),
new
StreamPosition
(
key1
,
"0-0"
),
// read past the end of stream # 2
// read past the end of stream # 2
new
StreamPosition
(
key2
,
new
Position
(
id4
)
)
new
StreamPosition
(
key2
,
id4
)
};
};
var
streams
=
db
.
StreamRead
(
streamList
);
var
streams
=
db
.
StreamRead
(
streamList
);
...
@@ -1372,8 +1372,8 @@ public void StreamReadMultipleStreamsWithEmptyResponse()
...
@@ -1372,8 +1372,8 @@ public void StreamReadMultipleStreamsWithEmptyResponse()
var
streamList
=
new
StreamPosition
[]
var
streamList
=
new
StreamPosition
[]
{
{
// Read past the end of both streams.
// Read past the end of both streams.
new
StreamPosition
(
key1
,
new
Position
(
id2
)
),
new
StreamPosition
(
key1
,
id2
),
new
StreamPosition
(
key2
,
new
Position
(
id4
)
)
new
StreamPosition
(
key2
,
id4
)
};
};
var
streams
=
db
.
StreamRead
(
streamList
);
var
streams
=
db
.
StreamRead
(
streamList
);
...
@@ -1399,7 +1399,7 @@ public void StreamReadPastEndOfStream()
...
@@ -1399,7 +1399,7 @@ public void StreamReadPastEndOfStream()
// Read after the final ID in the stream, we expect an empty array as a response.
// Read after the final ID in the stream, we expect an empty array as a response.
var
entries
=
db
.
StreamRead
(
key
,
new
Position
(
id2
)
);
var
entries
=
db
.
StreamRead
(
key
,
id2
);
Assert
.
True
(
entries
.
Length
==
0
);
Assert
.
True
(
entries
.
Length
==
0
);
}
}
...
@@ -1531,7 +1531,7 @@ public void StreamReadWithAfterIdAndCount_1()
...
@@ -1531,7 +1531,7 @@ public void StreamReadWithAfterIdAndCount_1()
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
var
id3
=
db
.
StreamAdd
(
key
,
"field3"
,
"value3"
);
// Only read a single item from the stream.
// Only read a single item from the stream.
var
entries
=
db
.
StreamRead
(
key
,
new
Position
(
id1
)
,
1
);
var
entries
=
db
.
StreamRead
(
key
,
id1
,
1
);
Assert
.
True
(
entries
.
Length
==
1
);
Assert
.
True
(
entries
.
Length
==
1
);
Assert
.
Equal
(
id2
,
entries
[
0
].
Id
);
Assert
.
Equal
(
id2
,
entries
[
0
].
Id
);
...
@@ -1555,7 +1555,7 @@ public void StreamReadWithAfterIdAndCount_2()
...
@@ -1555,7 +1555,7 @@ public void StreamReadWithAfterIdAndCount_2()
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
var
id4
=
db
.
StreamAdd
(
key
,
"field4"
,
"value4"
);
// Read multiple items from the stream.
// Read multiple items from the stream.
var
entries
=
db
.
StreamRead
(
key
,
new
Position
(
id1
)
,
2
);
var
entries
=
db
.
StreamRead
(
key
,
id1
,
2
);
Assert
.
True
(
entries
.
Length
==
2
);
Assert
.
True
(
entries
.
Length
==
2
);
Assert
.
Equal
(
id2
,
entries
[
0
].
Id
);
Assert
.
Equal
(
id2
,
entries
[
0
].
Id
);
...
...
StackExchange.Redis.Tests/WrapperBaseTests.cs
View file @
4054ad34
...
@@ -806,15 +806,15 @@ public void StreamConsumerInfoGetAsync()
...
@@ -806,15 +806,15 @@ public void StreamConsumerInfoGetAsync()
[
Fact
]
[
Fact
]
public
void
StreamConsumerGroupSetPositionAsync
()
public
void
StreamConsumerGroupSetPositionAsync
()
{
{
wrapper
.
StreamConsumerGroupSetPositionAsync
(
"key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamConsumerGroupSetPositionAsync
(
"key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamConsumerGroupSetPositionAsync
(
"prefix:key"
,
"group"
,
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamConsumerGroupSetPositionAsync
(
"prefix:key"
,
"group"
,
Stream
Position
.
Beginning
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamCreateConsumerGroupAsync
()
public
void
StreamCreateConsumerGroupAsync
()
{
{
wrapper
.
StreamCreateConsumerGroupAsync
(
"key"
,
"group"
,
new
Position
(
"0-0"
)
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamCreateConsumerGroupAsync
(
"key"
,
"group"
,
"0-0"
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamCreateConsumerGroupAsync
(
"prefix:key"
,
"group"
,
new
Position
(
"0-0"
)
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamCreateConsumerGroupAsync
(
"prefix:key"
,
"group"
,
"0-0"
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
...
@@ -892,15 +892,15 @@ public void StreamReadAsync_1()
...
@@ -892,15 +892,15 @@ public void StreamReadAsync_1()
[
Fact
]
[
Fact
]
public
void
StreamReadAsync_2
()
public
void
StreamReadAsync_2
()
{
{
wrapper
.
StreamReadAsync
(
"key"
,
new
Position
(
"0-0"
)
,
null
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamReadAsync
(
"key"
,
"0-0"
,
null
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamReadAsync
(
"prefix:key"
,
new
Position
(
"0-0"
)
,
null
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamReadAsync
(
"prefix:key"
,
"0-0"
,
null
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
public
void
StreamReadGroupAsync_1
()
public
void
StreamReadGroupAsync_1
()
{
{
wrapper
.
StreamReadGroupAsync
(
"key"
,
"group"
,
"consumer"
,
Position
.
Beginning
,
10
,
CommandFlags
.
HighPriority
);
wrapper
.
StreamReadGroupAsync
(
"key"
,
"group"
,
"consumer"
,
Stream
Position
.
Beginning
,
10
,
CommandFlags
.
HighPriority
);
mock
.
Verify
(
_
=>
_
.
StreamReadGroupAsync
(
"prefix:key"
,
"group"
,
"consumer"
,
Position
.
Beginning
,
10
,
CommandFlags
.
HighPriority
));
mock
.
Verify
(
_
=>
_
.
StreamReadGroupAsync
(
"prefix:key"
,
"group"
,
"consumer"
,
Stream
Position
.
Beginning
,
10
,
CommandFlags
.
HighPriority
));
}
}
[
Fact
]
[
Fact
]
...
...
StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabase.cs
View file @
4054ad34
...
@@ -1524,7 +1524,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
...
@@ -1524,7 +1524,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
/// <returns>True if successful, otherwise false.</returns>
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
);
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
...
@@ -1541,11 +1541,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
...
@@ -1541,11 +1541,11 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// </summary>
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="
Position.New
"/>.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="
StreamPosition.NewMessages
"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
/// Delete messages in the stream. This method does not delete the stream.
...
@@ -1652,7 +1652,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
...
@@ -1652,7 +1652,7 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
StreamEntry
[]
StreamRead
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
StreamEntry
[]
StreamRead
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Read from multiple streams.
/// Read from multiple streams.
...
@@ -1671,12 +1671,12 @@ public interface IDatabase : IRedis, IDatabaseAsync
...
@@ -1671,12 +1671,12 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <param name="key">The key of the stream.</param>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="
Position.New
"/> when null.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="
StreamPosition.NewMessages
"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
...
...
StackExchange.Redis/StackExchange/Redis/Interfaces/IDatabaseAsync.cs
View file @
4054ad34
...
@@ -1435,7 +1435,7 @@ public interface IDatabaseAsync : IRedisAsync
...
@@ -1435,7 +1435,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="position">The position from which to read for the consumer group.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if successful, otherwise false.</returns>
/// <returns>True if successful, otherwise false.</returns>
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
);
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
/// Retrieve information about the consumers for the given consumer group. This is the equivalent of calling "XINFO GROUPS key group".
...
@@ -1452,11 +1452,11 @@ public interface IDatabaseAsync : IRedisAsync
...
@@ -1452,11 +1452,11 @@ public interface IDatabaseAsync : IRedisAsync
/// </summary>
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="groupName">The name of the group to create.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="
Position.New
"/>.</param>
/// <param name="position">The position to begin reading the stream. Defaults to <see cref="
StreamPosition.NewMessages
"/>.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>True if the group was created.</returns>
/// <returns>True if the group was created.</returns>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
/// <remarks>https://redis.io/topics/streams-intro</remarks>
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Delete messages in the stream. This method does not delete the stream.
/// Delete messages in the stream. This method does not delete the stream.
...
@@ -1563,7 +1563,7 @@ public interface IDatabaseAsync : IRedisAsync
...
@@ -1563,7 +1563,7 @@ public interface IDatabaseAsync : IRedisAsync
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <returns>Returns an instance of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>Equivalent of calling XREAD COUNT num STREAMS key id.</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
/// <remarks>https://redis.io/commands/xread</remarks>
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Read from multiple streams.
/// Read from multiple streams.
...
@@ -1582,12 +1582,12 @@ public interface IDatabaseAsync : IRedisAsync
...
@@ -1582,12 +1582,12 @@ public interface IDatabaseAsync : IRedisAsync
/// <param name="key">The key of the stream.</param>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="consumerName">The consumer name.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="
Position.New
"/> when null.</param>
/// <param name="position">The position from which to read the stream. Defaults to <see cref="
StreamPosition.NewMessages
"/> when null.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="count">The maximum number of messages to return.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <returns>Returns a value of <see cref="StreamEntry"/> for each message returned.</returns>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
/// <remarks>https://redis.io/commands/xreadgroup</remarks>
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
);
/// <summary>
/// <summary>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
/// Read from multiple streams into the given consumer group. The consumer group with the given <paramref name="groupName"/>
...
...
StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/DatabaseWrapper.cs
View file @
4054ad34
...
@@ -636,12 +636,12 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R
...
@@ -636,12 +636,12 @@ public RedisValue[] StreamClaimIdsOnly(RedisKey key, RedisValue consumerGroup, R
return
Inner
.
StreamClaimIdsOnly
(
ToInner
(
key
),
consumerGroup
,
claimingConsumer
,
minIdleTimeInMs
,
messageIds
,
flags
);
return
Inner
.
StreamClaimIdsOnly
(
ToInner
(
key
),
consumerGroup
,
claimingConsumer
,
minIdleTimeInMs
,
messageIds
,
flags
);
}
}
public
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamConsumerGroupSetPosition
(
ToInner
(
key
),
groupName
,
position
,
flags
);
return
Inner
.
StreamConsumerGroupSetPosition
(
ToInner
(
key
),
groupName
,
position
,
flags
);
}
}
public
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamCreateConsumerGroup
(
ToInner
(
key
),
groupName
,
position
,
flags
);
return
Inner
.
StreamCreateConsumerGroup
(
ToInner
(
key
),
groupName
,
position
,
flags
);
}
}
...
@@ -696,7 +696,7 @@ public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisVa
...
@@ -696,7 +696,7 @@ public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisVa
return
Inner
.
StreamRange
(
ToInner
(
key
),
minId
,
maxId
,
count
,
messageOrder
,
flags
);
return
Inner
.
StreamRange
(
ToInner
(
key
),
minId
,
maxId
,
count
,
messageOrder
,
flags
);
}
}
public
StreamEntry
[]
StreamRead
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
StreamEntry
[]
StreamRead
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamRead
(
ToInner
(
key
),
position
,
count
,
flags
);
return
Inner
.
StreamRead
(
ToInner
(
key
),
position
,
count
,
flags
);
}
}
...
@@ -706,7 +706,7 @@ public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerS
...
@@ -706,7 +706,7 @@ public RedisStream[] StreamRead(StreamPosition[] streamPositions, int? countPerS
return
Inner
.
StreamRead
(
streamPositions
,
countPerStream
,
flags
);
return
Inner
.
StreamRead
(
streamPositions
,
countPerStream
,
flags
);
}
}
public
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamReadGroup
(
ToInner
(
key
),
groupName
,
consumerName
,
position
,
count
,
flags
);
return
Inner
.
StreamReadGroup
(
ToInner
(
key
),
groupName
,
consumerName
,
position
,
count
,
flags
);
}
}
...
...
StackExchange.Redis/StackExchange/Redis/KeyspaceIsolation/WrapperBase.cs
View file @
4054ad34
...
@@ -616,12 +616,12 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
...
@@ -616,12 +616,12 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return
Inner
.
StreamClaimIdsOnlyAsync
(
ToInner
(
key
),
consumerGroup
,
claimingConsumer
,
minIdleTimeInMs
,
messageIds
,
flags
);
return
Inner
.
StreamClaimIdsOnlyAsync
(
ToInner
(
key
),
consumerGroup
,
claimingConsumer
,
minIdleTimeInMs
,
messageIds
,
flags
);
}
}
public
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamConsumerGroupSetPositionAsync
(
ToInner
(
key
),
groupName
,
position
,
flags
);
return
Inner
.
StreamConsumerGroupSetPositionAsync
(
ToInner
(
key
),
groupName
,
position
,
flags
);
}
}
public
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamCreateConsumerGroupAsync
(
ToInner
(
key
),
groupName
,
position
,
flags
);
return
Inner
.
StreamCreateConsumerGroupAsync
(
ToInner
(
key
),
groupName
,
position
,
flags
);
}
}
...
@@ -676,7 +676,7 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
...
@@ -676,7 +676,7 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
return
Inner
.
StreamRangeAsync
(
ToInner
(
key
),
minId
,
maxId
,
count
,
messageOrder
,
flags
);
return
Inner
.
StreamRangeAsync
(
ToInner
(
key
),
minId
,
maxId
,
count
,
messageOrder
,
flags
);
}
}
public
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamReadAsync
(
ToInner
(
key
),
position
,
count
,
flags
);
return
Inner
.
StreamReadAsync
(
ToInner
(
key
),
position
,
count
,
flags
);
}
}
...
@@ -686,7 +686,7 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
...
@@ -686,7 +686,7 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return
Inner
.
StreamReadAsync
(
streamPositions
,
countPerStream
,
flags
);
return
Inner
.
StreamReadAsync
(
streamPositions
,
countPerStream
,
flags
);
}
}
public
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
return
Inner
.
StreamReadGroupAsync
(
ToInner
(
key
),
groupName
,
consumerName
,
position
,
count
,
flags
);
return
Inner
.
StreamReadGroupAsync
(
ToInner
(
key
),
groupName
,
consumerName
,
position
,
count
,
flags
);
}
}
...
...
StackExchange.Redis/StackExchange/Redis/Position.cs
deleted
100644 → 0
View file @
3dc56ba1
using
System
;
namespace
StackExchange.Redis
{
/// <summary>
/// A position within a stream. Defaults to <see cref="Position.New"/>.
/// </summary>
public
struct
Position
{
/// <summary>
/// Indicate a position from which to read a stream.
/// </summary>
/// <param name="readAfter">The position from which to read a stream.</param>
public
Position
(
RedisValue
readAfter
)
{
if
(
readAfter
==
RedisValue
.
Null
)
throw
new
ArgumentNullException
(
nameof
(
readAfter
),
"readAfter cannot be RedisValue.Null."
);
Kind
=
PositionKind
.
Explicit
;
ExplicitValue
=
readAfter
;
}
private
Position
(
PositionKind
kind
)
{
Kind
=
kind
;
ExplicitValue
=
RedisValue
.
Null
;
}
private
PositionKind
Kind
{
get
;
}
private
RedisValue
ExplicitValue
{
get
;
}
/// <summary>
/// Read new messages.
/// </summary>
public
static
Position
New
=
new
Position
(
PositionKind
.
New
);
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public
static
Position
Beginning
=
new
Position
(
PositionKind
.
Beginning
);
internal
RedisValue
ResolveForCommand
(
RedisCommand
command
)
{
if
(
Kind
==
PositionKind
.
Explicit
)
return
ExplicitValue
;
if
(
Kind
==
PositionKind
.
Beginning
)
return
StreamConstants
.
ReadMinValue
;
// PositionKind.New
if
(
command
==
RedisCommand
.
XREAD
)
throw
new
InvalidOperationException
(
"Position.New cannot be used with StreamRead."
);
if
(
command
==
RedisCommand
.
XREADGROUP
)
return
StreamConstants
.
UndeliveredMessages
;
if
(
command
==
RedisCommand
.
XGROUP
)
return
StreamConstants
.
NewMessages
;
throw
new
ArgumentException
(
$"Unsupported command in ResolveForCommand:
{
command
}
."
,
nameof
(
command
));
}
}
}
StackExchange.Redis/StackExchange/Redis/RedisDatabase.cs
View file @
4054ad34
...
@@ -1789,7 +1789,7 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
...
@@ -1789,7 +1789,7 @@ public Task<RedisValue[]> StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu
return
ExecuteAsync
(
msg
,
ResultProcessor
.
RedisValueArray
);
return
ExecuteAsync
(
msg
,
ResultProcessor
.
RedisValueArray
);
}
}
public
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
bool
StreamConsumerGroupSetPosition
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
msg
=
Message
.
Create
(
Database
,
var
msg
=
Message
.
Create
(
Database
,
flags
,
flags
,
...
@@ -1799,13 +1799,13 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, P
...
@@ -1799,13 +1799,13 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, P
StreamConstants
.
SetId
,
StreamConstants
.
SetId
,
key
.
AsRedisValue
(),
key
.
AsRedisValue
(),
groupName
,
groupName
,
position
.
ResolveForCommand
(
RedisCommand
.
XGROUP
)
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XGROUP
)
});
});
return
ExecuteSync
(
msg
,
ResultProcessor
.
Boolean
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
Boolean
);
}
}
public
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
bool
>
StreamConsumerGroupSetPositionAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
position
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
msg
=
Message
.
Create
(
Database
,
var
msg
=
Message
.
Create
(
Database
,
flags
,
flags
,
...
@@ -1815,15 +1815,15 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g
...
@@ -1815,15 +1815,15 @@ public Task<bool> StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue g
StreamConstants
.
SetId
,
StreamConstants
.
SetId
,
key
.
AsRedisValue
(),
key
.
AsRedisValue
(),
groupName
,
groupName
,
position
.
ResolveForCommand
(
RedisCommand
.
XGROUP
)
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XGROUP
)
});
});
return
ExecuteAsync
(
msg
,
ResultProcessor
.
Boolean
);
return
ExecuteAsync
(
msg
,
ResultProcessor
.
Boolean
);
}
}
public
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
bool
StreamCreateConsumerGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
actualPosition
=
position
??
Position
.
New
;
var
actualPosition
=
position
??
StreamConstants
.
NewMessages
;
var
msg
=
Message
.
Create
(
Database
,
var
msg
=
Message
.
Create
(
Database
,
flags
,
flags
,
...
@@ -1833,15 +1833,15 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Positi
...
@@ -1833,15 +1833,15 @@ public bool StreamCreateConsumerGroup(RedisKey key, RedisValue groupName, Positi
StreamConstants
.
Create
,
StreamConstants
.
Create
,
key
.
AsRedisValue
(),
key
.
AsRedisValue
(),
groupName
,
groupName
,
actualPosition
.
ResolveForCommand
(
RedisCommand
.
XGROUP
)
StreamPosition
.
Resolve
(
actualPosition
,
RedisCommand
.
XGROUP
)
});
});
return
ExecuteSync
(
msg
,
ResultProcessor
.
Boolean
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
Boolean
);
}
}
public
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
Position
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
bool
>
StreamCreateConsumerGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
?
position
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
actualPosition
=
position
??
Position
.
New
;
var
actualPosition
=
position
??
StreamPosition
.
NewMessages
;
var
msg
=
Message
.
Create
(
Database
,
var
msg
=
Message
.
Create
(
Database
,
flags
,
flags
,
...
@@ -1851,7 +1851,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN
...
@@ -1851,7 +1851,7 @@ public Task<bool> StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN
StreamConstants
.
Create
,
StreamConstants
.
Create
,
key
.
AsRedisValue
(),
key
.
AsRedisValue
(),
groupName
,
groupName
,
actualPosition
.
ResolveForCommand
(
RedisCommand
.
XGROUP
)
StreamPosition
.
Resolve
(
actualPosition
,
RedisCommand
.
XGROUP
)
});
});
return
ExecuteAsync
(
msg
,
ResultProcessor
.
Boolean
);
return
ExecuteAsync
(
msg
,
ResultProcessor
.
Boolean
);
...
@@ -2069,20 +2069,20 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
...
@@ -2069,20 +2069,20 @@ public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = nu
return
ExecuteAsync
(
msg
,
ResultProcessor
.
SingleStream
);
return
ExecuteAsync
(
msg
,
ResultProcessor
.
SingleStream
);
}
}
public
StreamEntry
[]
StreamRead
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
StreamEntry
[]
StreamRead
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
msg
=
GetSingleStreamReadMessage
(
key
,
var
msg
=
GetSingleStreamReadMessage
(
key
,
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
),
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
),
count
,
count
,
flags
);
flags
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
SingleStreamWithNameSkip
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
SingleStreamWithNameSkip
);
}
}
public
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
Position
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
StreamEntry
[
]>
StreamReadAsync
(
RedisKey
key
,
RedisValue
position
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
msg
=
GetSingleStreamReadMessage
(
key
,
var
msg
=
GetSingleStreamReadMessage
(
key
,
position
.
ResolveForCommand
(
RedisCommand
.
XREAD
),
StreamPosition
.
Resolve
(
position
,
RedisCommand
.
XREAD
),
count
,
count
,
flags
);
flags
);
...
@@ -2101,28 +2101,28 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
...
@@ -2101,28 +2101,28 @@ public Task<RedisStream[]> StreamReadAsync(StreamPosition[] streamPositions, int
return
ExecuteAsync
(
msg
,
ResultProcessor
.
MultiStream
);
return
ExecuteAsync
(
msg
,
ResultProcessor
.
MultiStream
);
}
}
public
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
StreamEntry
[]
StreamReadGroup
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
actualPosition
=
position
??
Position
.
New
;
var
actualPosition
=
position
??
StreamPosition
.
NewMessages
;
var
msg
=
GetStreamReadGroupMessage
(
key
,
var
msg
=
GetStreamReadGroupMessage
(
key
,
groupName
,
groupName
,
consumerName
,
consumerName
,
actualPosition
.
ResolveForCommand
(
RedisCommand
.
XREADGROUP
),
StreamPosition
.
Resolve
(
actualPosition
,
RedisCommand
.
XREADGROUP
),
count
,
count
,
flags
);
flags
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
SingleStreamWithNameSkip
);
return
ExecuteSync
(
msg
,
ResultProcessor
.
SingleStreamWithNameSkip
);
}
}
public
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
Position
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
public
Task
<
StreamEntry
[
]>
StreamReadGroupAsync
(
RedisKey
key
,
RedisValue
groupName
,
RedisValue
consumerName
,
RedisValue
?
position
=
null
,
int
?
count
=
null
,
CommandFlags
flags
=
CommandFlags
.
None
)
{
{
var
actualPosition
=
position
??
Position
.
New
;
var
actualPosition
=
position
??
StreamPosition
.
NewMessages
;
var
msg
=
GetStreamReadGroupMessage
(
key
,
var
msg
=
GetStreamReadGroupMessage
(
key
,
groupName
,
groupName
,
consumerName
,
consumerName
,
actualPosition
.
ResolveForCommand
(
RedisCommand
.
XREADGROUP
),
StreamPosition
.
Resolve
(
actualPosition
,
RedisCommand
.
XREADGROUP
),
count
,
count
,
flags
);
flags
);
...
@@ -2545,7 +2545,7 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions,
...
@@ -2545,7 +2545,7 @@ private Message GetMultiStreamReadGroupMessage(StreamPosition[] streamPositions,
for
(
var
i
=
0
;
i
<
pairCount
;
i
++)
for
(
var
i
=
0
;
i
<
pairCount
;
i
++)
{
{
values
[
offset
]
=
streamPositions
[
i
].
Key
.
AsRedisValue
();
values
[
offset
]
=
streamPositions
[
i
].
Key
.
AsRedisValue
();
values
[
offset
+
pairCount
]
=
streamPositions
[
i
].
Position
.
ResolveForCommand
(
RedisCommand
.
XREADGROUP
);
values
[
offset
+
pairCount
]
=
StreamPosition
.
Resolve
(
streamPositions
[
i
].
Position
,
RedisCommand
.
XREADGROUP
);
offset
++;
offset
++;
}
}
...
@@ -2601,7 +2601,7 @@ private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int?
...
@@ -2601,7 +2601,7 @@ private Message GetMultiStreamReadMessage(StreamPosition[] streamPositions, int?
for
(
var
i
=
0
;
i
<
pairCount
;
i
++)
for
(
var
i
=
0
;
i
<
pairCount
;
i
++)
{
{
values
[
offset
]
=
streamPositions
[
i
].
Key
.
AsRedisValue
();
values
[
offset
]
=
streamPositions
[
i
].
Key
.
AsRedisValue
();
values
[
offset
+
pairCount
]
=
streamPositions
[
i
].
Position
.
ResolveForCommand
(
RedisCommand
.
XREAD
);
values
[
offset
+
pairCount
]
=
StreamPosition
.
Resolve
(
streamPositions
[
i
].
Position
,
RedisCommand
.
XREAD
);
offset
++;
offset
++;
}
}
...
...
StackExchange.Redis/StackExchange/Redis/StreamPosition.cs
View file @
4054ad34
namespace
StackExchange.Redis
using
System
;
namespace
StackExchange.Redis
{
{
/// <summary>
/// <summary>
/// Describes a pair consisting of the Stream Key and the <see cref="Position"/> from which to begin reading a stream.
/// Describes a pair consisting of the Stream Key and the <see cref="Position"/> from which to begin reading a stream.
/// </summary>
/// </summary>
public
struct
StreamPosition
public
struct
StreamPosition
{
{
/// <summary>
/// Read from the beginning of a stream.
/// </summary>
public
static
RedisValue
Beginning
=>
StreamConstants
.
ReadMinValue
;
/// <summary>
/// Read new messages.
/// </summary>
public
static
RedisValue
NewMessages
=>
StreamConstants
.
NewMessages
;
/// <summary>
/// <summary>
/// Initializes a <see cref="StreamPosition"/> value.
/// Initializes a <see cref="StreamPosition"/> value.
/// </summary>
/// </summary>
/// <param name="key">The key for the stream.</param>
/// <param name="key">The key for the stream.</param>
/// <param name="position">The position from which to begin reading the stream.</param>
/// <param name="position">The position from which to begin reading the stream.</param>
public
StreamPosition
(
RedisKey
key
,
Position
position
)
public
StreamPosition
(
RedisKey
key
,
RedisValue
position
)
{
{
Key
=
key
;
Key
=
key
;
Position
=
position
;
Position
=
position
;
...
@@ -24,6 +36,22 @@ public StreamPosition(RedisKey key, Position position)
...
@@ -24,6 +36,22 @@ public StreamPosition(RedisKey key, Position position)
/// <summary>
/// <summary>
/// The offset at which to begin reading the stream.
/// The offset at which to begin reading the stream.
/// </summary>
/// </summary>
public
Position
Position
{
get
;
}
public
RedisValue
Position
{
get
;
}
internal
static
RedisValue
Resolve
(
RedisValue
value
,
RedisCommand
command
)
{
if
(
value
==
NewMessages
)
{
switch
(
command
)
{
case
RedisCommand
.
XREAD
:
throw
new
InvalidOperationException
(
"StreamPosition.NewMessages cannot be used with StreamRead."
);
case
RedisCommand
.
XREADGROUP
:
return
StreamConstants
.
UndeliveredMessages
;
case
RedisCommand
.
XGROUP
:
return
StreamConstants
.
NewMessages
;
default
:
// new is only valid for the above
throw
new
ArgumentException
(
$"Unsupported command in StreamPosition.Resolve:
{
command
}
."
,
nameof
(
command
));
}
}
return
value
;
}
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment