Commit 7710017f authored by Marc Gravell's avatar Marc Gravell

Network investigation

parent c9c1b040
...@@ -1702,14 +1702,14 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser ...@@ -1702,14 +1702,14 @@ internal T ExecuteSyncImpl<T>(Message message, ResultProcessor<T> processor, Ser
} }
else else
{ {
int inst, qu, qs, qc, wr, wq, @in; int inst, qu, qs, qc, wr, wq, @in, ar;
int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in); int queue = server.GetOutstandingCount(message.Command, out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar);
var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey) var sb = new StringBuilder("Timeout performing ").Append(message.CommandAndKey)
.Append(", inst: ").Append(inst) .Append(", inst: ").Append(inst)
.Append(", queue: ").Append(queue).Append(", qu=").Append(qu) .Append(", queue: ").Append(queue).Append(", qu=").Append(qu)
.Append(", qs=").Append(qs).Append(", qc=").Append(qc) .Append(", qs=").Append(qs).Append(", qc=").Append(qc)
.Append(", wr=").Append(wr).Append("/").Append(wq) .Append(", wr=").Append(wr).Append("/").Append(wq)
.Append(", in=").Append(@in); .Append(", in=").Append(@in).Append("/").Append(ar);
errMessage = sb.ToString(); errMessage = sb.ToString();
if (stormLogThreshold >= 0 && queue >= stormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0) if (stormLogThreshold >= 0 && queue >= stormLogThreshold && Interlocked.CompareExchange(ref haveStormLog, 1, 0) == 0)
......
...@@ -209,18 +209,18 @@ internal void GetCounters(ConnectionCounters counters) ...@@ -209,18 +209,18 @@ internal void GetCounters(ConnectionCounters counters)
} }
} }
internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in) internal int GetOutstandingCount(out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar)
{// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion {// defined as: PendingUnsentItems + SentItemsAwaitingResponse + ResponsesAwaitingAsyncCompletion
inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog)); inst = (int)(Interlocked.Read(ref operationCount) - Interlocked.Read(ref profileLastLog));
qu = queue.Count(); qu = queue.Count();
var tmp = physical; var tmp = physical;
if(tmp == null) if(tmp == null)
{ {
qs = @in = 0; qs = @in = ar = 0;
} else } else
{ {
qs = tmp.GetSentAwaitingResponseCount(); qs = tmp.GetSentAwaitingResponseCount();
@in = tmp.GetAvailableInboundBytes(); @in = tmp.GetAvailableInboundBytes(out ar);
} }
qc = completionManager.GetOutstandingCount(); qc = completionManager.GetOutstandingCount();
wr = Interlocked.CompareExchange(ref activeWriters, 0, 0); wr = Interlocked.CompareExchange(ref activeWriters, 0, 0);
......
...@@ -512,9 +512,11 @@ void BeginReading() ...@@ -512,9 +512,11 @@ void BeginReading()
} }
} while (keepReading); } while (keepReading);
} }
int haveReader;
internal int GetAvailableInboundBytes() internal int GetAvailableInboundBytes(out int activeReaders)
{ {
activeReaders = Interlocked.CompareExchange(ref haveReader, 0, 0);
return this.socketToken.Available; return this.socketToken.Available;
} }
...@@ -732,6 +734,7 @@ private bool ProcessReadBytes(int bytesRead) ...@@ -732,6 +734,7 @@ private bool ProcessReadBytes(int bytesRead)
void ISocketCallback.Read() void ISocketCallback.Read()
{ {
Interlocked.Increment(ref haveReader);
try try
{ {
do do
...@@ -748,6 +751,9 @@ void ISocketCallback.Read() ...@@ -748,6 +751,9 @@ void ISocketCallback.Read()
catch (Exception ex) catch (Exception ex)
{ {
RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex); RecordConnectionFailed(ConnectionFailureType.InternalFailure, ex);
}finally
{
Interlocked.Decrement(ref haveReader);
} }
} }
private RawResult ReadArray(byte[] buffer, ref int offset, ref int count) private RawResult ReadArray(byte[] buffer, ref int offset, ref int count)
......
...@@ -329,15 +329,15 @@ internal ServerCounters GetCounters() ...@@ -329,15 +329,15 @@ internal ServerCounters GetCounters()
return counters; return counters;
} }
internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in) internal int GetOutstandingCount(RedisCommand command, out int inst, out int qu, out int qs, out int qc, out int wr, out int wq, out int @in, out int ar)
{ {
var bridge = GetBridge(command, false); var bridge = GetBridge(command, false);
if (bridge == null) if (bridge == null)
{ {
return inst = qu = qs = qc = wr = wq = @in = 0; return inst = qu = qs = qc = wr = wq = @in = ar = 0;
} }
return bridge.GetOutstandingCount(out inst, out qu, out qs, out qc, out wr, out wq, out @in); return bridge.GetOutstandingCount(out inst, out qu, out qs, out qc, out wr, out wq, out @in, out ar);
} }
internal string GetProfile() internal string GetProfile()
......
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
#if !MONO #if !MONO
namespace StackExchange.Redis namespace StackExchange.Redis
{ {
partial class SocketManager partial class SocketManager
{ {
internal const SocketMode DefaultSocketMode = SocketMode.Poll; internal const SocketMode DefaultSocketMode = SocketMode.Poll;
...@@ -115,7 +117,6 @@ private void ProcessItems() ...@@ -115,7 +117,6 @@ private void ProcessItems()
ProcessItems(socketLookup, readQueue, CallbackOperation.Read); ProcessItems(socketLookup, readQueue, CallbackOperation.Read);
ProcessItems(socketLookup, errorQueue, CallbackOperation.Error); ProcessItems(socketLookup, errorQueue, CallbackOperation.Error);
} }
private void Read() private void Read()
{ {
bool weAreReader = false; bool weAreReader = false;
......
...@@ -15,7 +15,6 @@ internal enum SocketMode ...@@ -15,7 +15,6 @@ internal enum SocketMode
Poll, Poll,
Async Async
} }
/// <summary> /// <summary>
/// Allows callbacks from SocketManager as work is discovered /// Allows callbacks from SocketManager as work is discovered
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment