Commit 8761282d authored by Martinek Vilmos's avatar Martinek Vilmos Committed by Marc Gravell

Add pop functionality to sorted set (ZPOPMIN, ZPOPMAX) (#1026)

* Add pop functionality to sorted set (ZPOPMIN, ZPOPMAX)

* Add feature check for sorted set pop

* Add tests for sorted set pop
parent cad91c7b
...@@ -187,6 +187,8 @@ internal enum RedisCommand ...@@ -187,6 +187,8 @@ internal enum RedisCommand
ZINCRBY, ZINCRBY,
ZINTERSTORE, ZINTERSTORE,
ZLEXCOUNT, ZLEXCOUNT,
ZPOPMAX,
ZPOPMIN,
ZRANGE, ZRANGE,
ZRANGEBYLEX, ZRANGEBYLEX,
ZRANGEBYSCORE, ZRANGEBYSCORE,
......
...@@ -1451,6 +1451,29 @@ public interface IDatabase : IRedis, IDatabaseAsync ...@@ -1451,6 +1451,29 @@ public interface IDatabase : IRedis, IDatabaseAsync
/// <remarks>https://redis.io/commands/zscore</remarks> /// <remarks>https://redis.io/commands/zscore</remarks>
double? SortedSetScore(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); double? SortedSetScore(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Removes and returns the first element from the sorted set stored at key, by default with the scores ordered from low to high.
/// </summary>
/// <param name="key">The key of the sorted set.</param>
/// <param name="order">The order to sort by (defaults to ascending).</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The removed element, or nil when key does not exist.</returns>
/// <remarks>https://redis.io/commands/zpopmin</remarks>
/// <remarks>https://redis.io/commands/zpopmax</remarks>
SortedSetEntry? SortedSetPop(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Removes and returns the specified number of first elements from the sorted set stored at key, by default with the scores ordered from low to high.
/// </summary>
/// <param name="key">The key of the sorted set.</param>
/// <param name="count">The number of elements to return.</param>
/// <param name="order">The order to sort by (defaults to ascending).</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An array of elements, or an empty array when key does not exist.</returns>
/// <remarks>https://redis.io/commands/zpopmin</remarks>
/// <remarks>https://redis.io/commands/zpopmax</remarks>
SortedSetEntry[] SortedSetPop(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary> /// </summary>
......
...@@ -1362,6 +1362,29 @@ public interface IDatabaseAsync : IRedisAsync ...@@ -1362,6 +1362,29 @@ public interface IDatabaseAsync : IRedisAsync
/// <remarks>https://redis.io/commands/zscore</remarks> /// <remarks>https://redis.io/commands/zscore</remarks>
Task<double?> SortedSetScoreAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None); Task<double?> SortedSetScoreAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Removes and returns the first element from the sorted set stored at key, by default with the scores ordered from low to high.
/// </summary>
/// <param name="key">The key of the sorted set.</param>
/// <param name="order">The order to sort by (defaults to ascending).</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The removed element, or nil when key does not exist.</returns>
/// <remarks>https://redis.io/commands/zpopmin</remarks>
/// <remarks>https://redis.io/commands/zpopmax</remarks>
Task<SortedSetEntry?> SortedSetPopAsync(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary>
/// Removes and returns the specified number of first elements from the sorted set stored at key, by default with the scores ordered from low to high.
/// </summary>
/// <param name="key">The key of the sorted set.</param>
/// <param name="count">The number of elements to return.</param>
/// <param name="order">The order to sort by (defaults to ascending).</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An array of elements, or an empty array when key does not exist.</returns>
/// <remarks>https://redis.io/commands/zpopmin</remarks>
/// <remarks>https://redis.io/commands/zpopmax</remarks>
Task<SortedSetEntry[]> SortedSetPopAsync(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None);
/// <summary> /// <summary>
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged. /// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
/// </summary> /// </summary>
......
...@@ -611,6 +611,16 @@ public long SortedSetRemoveRangeByValue(RedisKey key, RedisValue min, RedisValue ...@@ -611,6 +611,16 @@ public long SortedSetRemoveRangeByValue(RedisKey key, RedisValue min, RedisValue
return Inner.SortedSetScore(ToInner(key), member, flags); return Inner.SortedSetScore(ToInner(key), member, flags);
} }
public SortedSetEntry? SortedSetPop(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetPop(ToInner(key), order, flags);
}
public SortedSetEntry[] SortedSetPop(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetPop(ToInner(key), count, order, flags);
}
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamAcknowledge(ToInner(key), groupName, messageId, flags); return Inner.StreamAcknowledge(ToInner(key), groupName, messageId, flags);
......
...@@ -591,6 +591,16 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min, ...@@ -591,6 +591,16 @@ public Task<long> SortedSetRemoveRangeByValueAsync(RedisKey key, RedisValue min,
return Inner.SortedSetScoreAsync(ToInner(key), member, flags); return Inner.SortedSetScoreAsync(ToInner(key), member, flags);
} }
public Task<SortedSetEntry?> SortedSetPopAsync(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetPopAsync(ToInner(key), order, flags);
}
public Task<SortedSetEntry[]> SortedSetPopAsync(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
return Inner.SortedSetPopAsync(ToInner(key), count, order, flags);
}
public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) public Task<long> StreamAcknowledgeAsync(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{ {
return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageId, flags); return Inner.StreamAcknowledgeAsync(ToInner(key), groupName, messageId, flags);
......
...@@ -392,6 +392,8 @@ public static bool IsMasterOnly(RedisCommand command) ...@@ -392,6 +392,8 @@ public static bool IsMasterOnly(RedisCommand command)
case RedisCommand.ZADD: case RedisCommand.ZADD:
case RedisCommand.ZINTERSTORE: case RedisCommand.ZINTERSTORE:
case RedisCommand.ZINCRBY: case RedisCommand.ZINCRBY:
case RedisCommand.ZPOPMAX:
case RedisCommand.ZPOPMIN:
case RedisCommand.ZREM: case RedisCommand.ZREM:
case RedisCommand.ZREMRANGEBYLEX: case RedisCommand.ZREMRANGEBYLEX:
case RedisCommand.ZREMRANGEBYRANK: case RedisCommand.ZREMRANGEBYRANK:
......
...@@ -1677,6 +1677,36 @@ public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue patter ...@@ -1677,6 +1677,36 @@ public IEnumerable<SortedSetEntry> SortedSetScan(RedisKey key, RedisValue patter
return ExecuteAsync(msg, ResultProcessor.NullableDouble); return ExecuteAsync(msg, ResultProcessor.NullableDouble);
} }
public SortedSetEntry? SortedSetPop(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key);
return ExecuteSync(msg, ResultProcessor.SortedSetEntry);
}
public Task<SortedSetEntry?> SortedSetPopAsync(RedisKey key, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key);
return ExecuteAsync(msg, ResultProcessor.SortedSetEntry);
}
public SortedSetEntry[] SortedSetPop(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
if (count == 0) return Array.Empty<SortedSetEntry>();
var msg = count == 1
? Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key)
: Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key, count);
return ExecuteSync(msg, ResultProcessor.SortedSetWithScores);
}
public Task<SortedSetEntry[]> SortedSetPopAsync(RedisKey key, long count, Order order = Order.Ascending, CommandFlags flags = CommandFlags.None)
{
if (count == 0) return Task.FromResult(Array.Empty<SortedSetEntry>());
var msg = count == 1
? Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key)
: Message.Create(Database, flags, order == Order.Descending ? RedisCommand.ZPOPMAX : RedisCommand.ZPOPMIN, key, count);
return ExecuteAsync(msg, ResultProcessor.SortedSetWithScores);
}
public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None) public long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue messageId, CommandFlags flags = CommandFlags.None)
{ {
var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags); var msg = GetStreamAcknowledgeMessage(key, groupName, messageId, flags);
......
...@@ -125,6 +125,11 @@ public RedisFeatures(Version version) ...@@ -125,6 +125,11 @@ public RedisFeatures(Version version)
/// </summary> /// </summary>
public bool SetVaradicAddRemove => Version >= v2_4_0; public bool SetVaradicAddRemove => Version >= v2_4_0;
/// <summary>
/// Is ZPOPMAX and ZPOPMIN available?
/// </summary>
public bool SortedSetPop => Version >= v4_9_1;
/// <summary> /// <summary>
/// Are Redis Streams available? /// Are Redis Streams available?
/// </summary> /// </summary>
......
...@@ -88,6 +88,8 @@ public static readonly MultiStreamProcessor ...@@ -88,6 +88,8 @@ public static readonly MultiStreamProcessor
public static readonly ResultProcessor<RedisResult> public static readonly ResultProcessor<RedisResult>
ScriptResult = new ScriptResultProcessor(); ScriptResult = new ScriptResultProcessor();
public static readonly SortedSetEntryProcessor
SortedSetEntry = new SortedSetEntryProcessor();
public static readonly SortedSetEntryArrayProcessor public static readonly SortedSetEntryArrayProcessor
SortedSetWithScores = new SortedSetEntryArrayProcessor(); SortedSetWithScores = new SortedSetEntryArrayProcessor();
...@@ -498,6 +500,40 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes ...@@ -498,6 +500,40 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
} }
} }
internal sealed class SortedSetEntryProcessor : ResultProcessor<SortedSetEntry?>
{
public bool TryParse(in RawResult result, out SortedSetEntry? entry)
{
switch (result.Type)
{
case ResultType.MultiBulk:
var arr = result.GetItems();
if (result.IsNull || arr.Length < 2)
{
entry = null;
}
else
{
entry = new SortedSetEntry(arr[0].AsRedisValue(), arr[1].TryGetDouble(out double val) ? val : double.NaN);
}
return true;
default:
entry = null;
return false;
}
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
{
if (TryParse(result, out SortedSetEntry? entry))
{
SetResult(message, entry);
return true;
}
return false;
}
}
internal sealed class SortedSetEntryArrayProcessor : ValuePairInterleavedProcessorBase<SortedSetEntry> internal sealed class SortedSetEntryArrayProcessor : ValuePairInterleavedProcessorBase<SortedSetEntry>
{ {
protected override SortedSetEntry Parse(in RawResult first, in RawResult second) protected override SortedSetEntry Parse(in RawResult first, in RawResult second)
......
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace StackExchange.Redis.Tests
{
[Collection(SharedConnectionFixture.Key)]
public class SortedSets : TestBase
{
public SortedSets(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { }
public static SortedSetEntry[] entries = new SortedSetEntry[]
{
new SortedSetEntry("a", 1),
new SortedSetEntry("b", 2),
new SortedSetEntry("c", 3),
new SortedSetEntry("d", 4),
new SortedSetEntry("e", 5),
new SortedSetEntry("f", 6),
new SortedSetEntry("g", 7),
new SortedSetEntry("h", 8),
new SortedSetEntry("i", 9),
new SortedSetEntry("j", 10)
};
[Fact]
public void SortedSetPopMulti_Multi()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.SortedSetPop), r => r.SortedSetPop);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.SortedSetAdd(key, entries, CommandFlags.FireAndForget);
var first = db.SortedSetPop(key, Order.Ascending);
Assert.True(first.HasValue);
Assert.Equal(entries[0], first.Value);
Assert.Equal(9, db.SortedSetLength(key));
var lasts = db.SortedSetPop(key, 2, Order.Descending);
Assert.Equal(2, lasts.Length);
Assert.Equal(entries[9], lasts[0]);
Assert.Equal(entries[8], lasts[1]);
Assert.Equal(7, db.SortedSetLength(key));
}
}
[Fact]
public void SortedSetPopMulti_Single()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.SortedSetPop), r => r.SortedSetPop);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.SortedSetAdd(key, entries, CommandFlags.FireAndForget);
var last = db.SortedSetPop(key, Order.Descending);
Assert.True(last.HasValue);
Assert.Equal(entries[9], last.Value);
Assert.Equal(9, db.SortedSetLength(key));
var firsts = db.SortedSetPop(key, 1, Order.Ascending);
Assert.Single(firsts);
Assert.Equal(entries[0], firsts[0]);
Assert.Equal(8, db.SortedSetLength(key));
}
}
[Fact]
public async Task SortedSetPopMulti_Multi_Async()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.SortedSetPop), r => r.SortedSetPop);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.SortedSetAdd(key, entries, CommandFlags.FireAndForget);
var last = await db.SortedSetPopAsync(key, Order.Descending).ForAwait();
Assert.True(last.HasValue);
Assert.Equal(entries[9], last.Value);
Assert.Equal(9, db.SortedSetLength(key));
var moreLasts = await db.SortedSetPopAsync(key, 2, Order.Descending).ForAwait();
Assert.Equal(2, moreLasts.Length);
Assert.Equal(entries[8], moreLasts[0]);
Assert.Equal(entries[7], moreLasts[1]);
Assert.Equal(7, db.SortedSetLength(key));
}
}
[Fact]
public async Task SortedSetPopMulti_Single_Async()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.SortedSetPop), r => r.SortedSetPop);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.SortedSetAdd(key, entries, CommandFlags.FireAndForget);
var first = await db.SortedSetPopAsync(key).ForAwait();
Assert.True(first.HasValue);
Assert.Equal(entries[0], first.Value);
Assert.Equal(9, db.SortedSetLength(key));
var moreFirsts = await db.SortedSetPopAsync(key, 1).ForAwait();
Assert.Single(moreFirsts);
Assert.Equal(entries[1], moreFirsts[0]);
Assert.Equal(8, db.SortedSetLength(key));
}
}
[Fact]
public async Task SortedSetPopMulti_Zero_Async()
{
using (var conn = Create())
{
Skip.IfMissingFeature(conn, nameof(RedisFeatures.SortedSetPop), r => r.SortedSetPop);
var db = conn.GetDatabase();
var key = Me();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.SortedSetAdd(key, entries, CommandFlags.FireAndForget);
var t = db.SortedSetPopAsync(key, count: 0);
Assert.True(t.IsCompleted); // sync
var arr = await t;
Assert.Empty(arr);
Assert.Equal(10, db.SortedSetLength(key));
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment