Commit 0cebe9e1 authored by Marc Gravell's avatar Marc Gravell

ensure that subscription connections are QUIT cleanly (interactive connections...

ensure that subscription connections are QUIT cleanly (interactive connections were already QUIT), *without* resurrecting like zombies
parent 4b047ec6
......@@ -1925,7 +1925,7 @@ private void DisposeAndClearServers()
private Task[] QuitAllServers()
{
Task[] quits = new Task[servers.Count];
var quits = new Task[2 * servers.Count];
lock (servers)
{
var iter = servers.GetEnumerator();
......@@ -1933,7 +1933,8 @@ private Task[] QuitAllServers()
while (iter.MoveNext())
{
var server = (ServerEndPoint)iter.Value;
quits[index++] = server.Close();
quits[index++] = server.Close(ConnectionType.Interactive);
quits[index++] = server.Close(ConnectionType.Subscription);
}
}
return quits;
......
......@@ -170,9 +170,9 @@ partial class ConnectionMultiplexer
#if LOGOUTPUT
partial class PhysicalConnection
{
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name, SocketManager mgr)
partial void OnWrapForLogging(ref System.IO.Pipelines.IDuplexPipe pipe, string name, SocketManager mgr)
{
foreach(var c in Path.GetInvalidFileNameChars())
foreach(var c in System.IO.Path.GetInvalidFileNameChars())
{
name = name.Replace(c, '_');
}
......
......@@ -30,18 +30,6 @@ public virtual Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
return ExecuteAsync(msg, ResultProcessor.ResponseTimer);
}
public void Quit(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.QUIT);
ExecuteSync(msg, ResultProcessor.DemandOK);
}
public Task QuitAsync(CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(-1, flags, RedisCommand.QUIT);
return ExecuteAsync(msg, ResultProcessor.DemandOK);
}
public override string ToString() => multiplexer.ToString();
public bool TryWait(Task task) => task.Wait(multiplexer.TimeoutMilliseconds);
......
......@@ -320,16 +320,16 @@ internal void AutoConfigure(PhysicalConnection connection)
internal uint NextReplicaOffset() // used to round-robin between multiple replicas
=> (uint)System.Threading.Interlocked.Increment(ref _nextReplicaOffset);
internal Task Close()
internal Task Close(ConnectionType connectionType)
{
var tmp = interactive;
var tmp = GetBridge(connectionType, create: false);
if (tmp == null || !tmp.IsConnected || !Multiplexer.CommandMap.IsAvailable(RedisCommand.QUIT))
{
return CompletedTask<bool>.Default(null);
}
else
{
return WriteDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: interactive);
return WriteDirectAsync(Message.Create(-1, CommandFlags.None, RedisCommand.QUIT), ResultProcessor.DemandOK, bridge: tmp);
}
}
......@@ -636,6 +636,7 @@ internal void WriteDirectOrQueueFireAndForget<T>(PhysicalConnection connection,
}
private PhysicalBridge CreateBridge(ConnectionType type, TextWriter log)
{
if (Multiplexer.IsDisposed) return null;
Multiplexer.Trace(type.ToString());
var bridge = new PhysicalBridge(this, type, Multiplexer.TimeoutMilliseconds);
bridge.TryConnect(log);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment