Commit ad0eb82c authored by Marc Gravell's avatar Marc Gravell

fixing the last unanswered write time; number was garbage

parent 42b5a860
...@@ -742,8 +742,11 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli ...@@ -742,8 +742,11 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new) internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
{ {
var handler = HashSlotMoved; var handler = HashSlotMoved;
if (handler != null) ConnectionMultiplexer.CompleteAsWorker( if (handler != null)
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new)); {
ConnectionMultiplexer.CompleteAsWorker(
new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new));
}
} }
/// <summary> /// <summary>
......
...@@ -666,6 +666,7 @@ internal void SetWriteTime() ...@@ -666,6 +666,7 @@ internal void SetWriteTime()
} }
} }
private int _writeTickCount; private int _writeTickCount;
public int GetWriteTime() => Volatile.Read(ref _writeTickCount);
private void SetNeedsTimeoutCheck() => Flags |= NeedsAsyncTimeoutCheckFlag; private void SetNeedsTimeoutCheck() => Flags |= NeedsAsyncTimeoutCheckFlag;
internal bool HasAsyncTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken) internal bool HasAsyncTimedOut(int now, int timeoutMilliseconds, out int millisecondsTaken)
......
...@@ -286,9 +286,9 @@ private void ShutdownSubscriptionQueue() ...@@ -286,9 +286,9 @@ private void ShutdownSubscriptionQueue()
} }
} }
internal bool TryEnqueueBackgroundSubscriptionWrite(PendingSubscriptionState state) internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state); => isDisposed ? false : (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int qu) internal void GetOutstandingCount(out int inst, out int qs, out int @in, out int qu)
{ {
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
...@@ -443,7 +443,6 @@ private void AbandonPendingBacklog(Exception ex) ...@@ -443,7 +443,6 @@ private void AbandonPendingBacklog(Exception ex)
Message next; Message next;
do do
{ {
lock (_backlog) lock (_backlog)
{ {
next = _backlog.Count == 0 ? null : _backlog.Dequeue(); next = _backlog.Count == 0 ? null : _backlog.Dequeue();
...@@ -737,11 +736,10 @@ private void StartBacklogProcessor() ...@@ -737,11 +736,10 @@ private void StartBacklogProcessor()
sched.Schedule(s_ProcessBacklog, _weakRefThis); sched.Schedule(s_ProcessBacklog, _weakRefThis);
} }
static readonly Action<object> s_ProcessBacklog = s => private static readonly Action<object> s_ProcessBacklog = s =>
{ {
var wr = (WeakReference)s; var wr = (WeakReference)s;
var bridge = wr.Target as PhysicalBridge; if (wr.Target is PhysicalBridge bridge) bridge.ProcessBacklog();
if (bridge != null) bridge.ProcessBacklog();
}; };
private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
......
...@@ -52,7 +52,6 @@ private static readonly Message ...@@ -52,7 +52,6 @@ private static readonly Message
private int failureReported; private int failureReported;
private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount; private int lastWriteTickCount, lastReadTickCount, lastBeatTickCount;
private int firstUnansweredWriteTickCount;
internal void GetBytes(out long sent, out long received) internal void GetBytes(out long sent, out long received)
{ {
...@@ -88,7 +87,6 @@ public PhysicalConnection(PhysicalBridge bridge) ...@@ -88,7 +87,6 @@ public PhysicalConnection(PhysicalBridge bridge)
internal async Task BeginConnectAsync(TextWriter log) internal async Task BeginConnectAsync(TextWriter log)
{ {
Thread.VolatileWrite(ref firstUnansweredWriteTickCount, 0);
var bridge = BridgeCouldBeNull; var bridge = BridgeCouldBeNull;
var endpoint = bridge?.ServerEndPoint?.EndPoint; var endpoint = bridge?.ServerEndPoint?.EndPoint;
if (endpoint == null) if (endpoint == null)
...@@ -324,7 +322,17 @@ public Task FlushAsync() ...@@ -324,7 +322,17 @@ public Task FlushAsync()
{ {
int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount), int now = Environment.TickCount, lastRead = Thread.VolatileRead(ref lastReadTickCount), lastWrite = Thread.VolatileRead(ref lastWriteTickCount),
lastBeat = Thread.VolatileRead(ref lastBeatTickCount); lastBeat = Thread.VolatileRead(ref lastBeatTickCount);
int unansweredRead = Thread.VolatileRead(ref firstUnansweredWriteTickCount);
int unansweredWriteTime = 0;
lock (_writtenAwaitingResponse)
{
// find oldest message awaiting a response
if (_writtenAwaitingResponse.Count != 0)
{
var next = _writtenAwaitingResponse.Peek();
unansweredWriteTime = next.GetWriteTime();
}
}
var exMessage = new StringBuilder(failureType.ToString()); var exMessage = new StringBuilder(failureType.ToString());
...@@ -371,7 +379,7 @@ void add(string lk, string sk, string v) ...@@ -371,7 +379,7 @@ void add(string lk, string sk, string v)
add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString()); add("Outstanding-Responses", "outstanding", GetSentAwaitingResponseCount().ToString());
add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago"); add("Last-Read", "last-read", (unchecked(now - lastRead) / 1000) + "s ago");
add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago"); add("Last-Write", "last-write", (unchecked(now - lastWrite) / 1000) + "s ago");
add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredRead) / 1000) + "s ago"); if(unansweredWriteTime != 0) add("Unanswered-Write", "unanswered-write", (unchecked(now - unansweredWriteTime) / 1000) + "s ago");
add("Keep-Alive", "keep-alive", bridge.ServerEndPoint?.WriteEverySeconds + "s"); add("Keep-Alive", "keep-alive", bridge.ServerEndPoint?.WriteEverySeconds + "s");
add("Previous-Physical-State", "state", oldState.ToString()); add("Previous-Physical-State", "state", oldState.ToString());
add("Manager", "mgr", bridge.Multiplexer.SocketManager?.GetState()); add("Manager", "mgr", bridge.Multiplexer.SocketManager?.GetState());
...@@ -696,9 +704,6 @@ internal void WriteHeader(RedisCommand command, int arguments, CommandBytes comm ...@@ -696,9 +704,6 @@ internal void WriteHeader(RedisCommand command, int arguments, CommandBytes comm
// ExecuteMessage should have dealt with everything else // ExecuteMessage should have dealt with everything else
if (commandBytes.IsEmpty) throw ExceptionFactory.CommandDisabled(command); if (commandBytes.IsEmpty) throw ExceptionFactory.CommandDisabled(command);
// remember the time of the first write that still not followed by read
Interlocked.CompareExchange(ref firstUnansweredWriteTickCount, Environment.TickCount, 0);
// *{argCount}\r\n = 3 + MaxInt32TextLen // *{argCount}\r\n = 3 + MaxInt32TextLen
// ${cmd-len}\r\n = 3 + MaxInt32TextLen // ${cmd-len}\r\n = 3 + MaxInt32TextLen
// {cmd}\r\n = 2 + commandBytes.Length // {cmd}\r\n = 2 + commandBytes.Length
......
...@@ -291,7 +291,7 @@ internal bool GetBoolean() ...@@ -291,7 +291,7 @@ internal bool GetBoolean()
return AsGeoPosition(root.GetItems()); return AsGeoPosition(root.GetItems());
} }
static GeoPosition AsGeoPosition(Sequence<RawResult> coords) private static GeoPosition AsGeoPosition(in Sequence<RawResult> coords)
{ {
double longitude, latitude; double longitude, latitude;
if (coords.IsSingleSegment) if (coords.IsSingleSegment)
......
...@@ -17,7 +17,7 @@ internal static void CompleteAsWorker(ICompletable completable) ...@@ -17,7 +17,7 @@ internal static void CompleteAsWorker(ICompletable completable)
if (completable != null) ThreadPool.QueueUserWorkItem(s_CompleteAsWorker, completable); if (completable != null) ThreadPool.QueueUserWorkItem(s_CompleteAsWorker, completable);
} }
static readonly WaitCallback s_CompleteAsWorker = s => ((ICompletable)s).TryComplete(true); private static readonly WaitCallback s_CompleteAsWorker = s => ((ICompletable)s).TryComplete(true);
internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs, ICompletable internal static bool TryCompleteHandler<T>(EventHandler<T> handler, object sender, T args, bool isAsync) where T : EventArgs, ICompletable
{ {
...@@ -261,7 +261,6 @@ public static PendingSubscriptionState Create(RedisChannel channel, Subscription ...@@ -261,7 +261,6 @@ public static PendingSubscriptionState Create(RedisChannel channel, Subscription
private PendingSubscriptionState(object asyncState, RedisChannel channel, Subscription subscription, CommandFlags flags, bool subscribe, bool internalCall, bool isSlave) private PendingSubscriptionState(object asyncState, RedisChannel channel, Subscription subscription, CommandFlags flags, bool subscribe, bool internalCall, bool isSlave)
{ {
var cmd = subscribe var cmd = subscribe
? (channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE) ? (channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE)
: (channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE); : (channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE);
......
...@@ -1114,7 +1114,7 @@ public RedisChannelArrayProcessor(RedisChannel.PatternMode mode) ...@@ -1114,7 +1114,7 @@ public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
this.mode = mode; this.mode = mode;
} }
readonly struct ChannelState // I would use a value-tuple here, but that is binding hell private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell
{ {
public readonly byte[] Prefix; public readonly byte[] Prefix;
public readonly RedisChannel.PatternMode Mode; public readonly RedisChannel.PatternMode Mode;
......
...@@ -8,7 +8,9 @@ internal sealed class ServerSelectionStrategy ...@@ -8,7 +8,9 @@ internal sealed class ServerSelectionStrategy
{ {
public const int NoSlot = -1, MultipleSlots = -2; public const int NoSlot = -1, MultipleSlots = -2;
private const int RedisClusterSlotCount = 16384; private const int RedisClusterSlotCount = 16384;
#pragma warning disable IDE1006 // Naming Styles
private static ReadOnlySpan<ushort> s_crc16tab => new ushort[] private static ReadOnlySpan<ushort> s_crc16tab => new ushort[]
#pragma warning restore IDE1006 // Naming Styles
{ // this syntax allows a special-case population implementation by the compiler/JIT { // this syntax allows a special-case population implementation by the compiler/JIT
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7, 0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef, 0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
...@@ -72,21 +74,23 @@ private static unsafe int GetClusterSlot(in RedisKey key) ...@@ -72,21 +74,23 @@ private static unsafe int GetClusterSlot(in RedisKey key)
{ {
var blob = (byte[])key; var blob = (byte[])key;
fixed (byte* ptr = blob) fixed (byte* ptr = blob)
fixed (ushort* crc16tab = s_crc16tab)
{ {
int offset = 0, count = blob.Length, start, end; fixed (ushort* crc16tab = s_crc16tab)
if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
&& (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
&& --end != start)
{ {
offset = start + 1; int offset = 0, count = blob.Length, start, end;
count = end - start; // note we already subtracted one via --end if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
} && (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
&& --end != start)
{
offset = start + 1;
count = end - start; // note we already subtracted one via --end
}
uint crc = 0; uint crc = 0;
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ ptr[offset++]) & 0x00FF]) & 0x0000FFFF; crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ ptr[offset++]) & 0x00FF]) & 0x0000FFFF;
return (int)(crc % RedisClusterSlotCount); return (int)(crc % RedisClusterSlotCount);
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment