Commit 4a02a425 authored by Marc Gravell's avatar Marc Gravell

Make publish-reconfigure dependent on a completed configuration check when reasonable

parent dd3f37ac
...@@ -15,8 +15,9 @@ public class Config : TestBase ...@@ -15,8 +15,9 @@ public class Config : TestBase
public void VerifyReceiveConfigChangeBroadcast() public void VerifyReceiveConfigChangeBroadcast()
{ {
var config = this.GetConfiguration();
using (var sender = Create(allowAdmin: true)) using (var sender = Create(allowAdmin: true))
using (var receiver = Create()) using (var receiver = Create(syncTimeout: 2000))
{ {
int total = 0; int total = 0;
receiver.ConfigurationChangedBroadcast += (s, a) => receiver.ConfigurationChangedBroadcast += (s, a) =>
...@@ -24,12 +25,12 @@ public void VerifyReceiveConfigChangeBroadcast() ...@@ -24,12 +25,12 @@ public void VerifyReceiveConfigChangeBroadcast()
Console.WriteLine("Config changed: " + (a.EndPoint == null ? "(none)" : a.EndPoint.ToString())); Console.WriteLine("Config changed: " + (a.EndPoint == null ? "(none)" : a.EndPoint.ToString()));
Interlocked.Increment(ref total); Interlocked.Increment(ref total);
}; };
Thread.Sleep(500);
// send a reconfigure/reconnect message // send a reconfigure/reconnect message
long count = sender.PublishReconfigure(); long count = sender.PublishReconfigure();
GetServer(receiver).Ping(); GetServer(receiver).Ping();
GetServer(receiver).Ping(); GetServer(receiver).Ping();
Assert.IsTrue(count >= 2, "subscribers"); Assert.IsTrue(count == -1 || count >= 2, "subscribers");
Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (1st)"); Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (1st)");
Interlocked.Exchange(ref total, 0); Interlocked.Exchange(ref total, 0);
...@@ -38,6 +39,7 @@ public void VerifyReceiveConfigChangeBroadcast() ...@@ -38,6 +39,7 @@ public void VerifyReceiveConfigChangeBroadcast()
var server = GetServer(sender); var server = GetServer(sender);
if (server.IsSlave) Assert.Inconclusive("didn't expect a slave"); if (server.IsSlave) Assert.Inconclusive("didn't expect a slave");
server.MakeMaster(ReplicationChangeOptions.Broadcast); server.MakeMaster(ReplicationChangeOptions.Broadcast);
Thread.Sleep(100);
GetServer(receiver).Ping(); GetServer(receiver).Ping();
GetServer(receiver).Ping(); GetServer(receiver).Ping();
Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (2nd)"); Assert.IsTrue(Interlocked.CompareExchange(ref total, 0, 0) >= 1, "total (2nd)");
......
...@@ -617,7 +617,7 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re ...@@ -617,7 +617,7 @@ internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, Re
for (int i = 0; i < len; i++) for (int i = 0; i < len; i++)
{ {
var server = tmp[(int)(((uint)i + startOffset) % len)]; var server = tmp[(int)(((uint)i + startOffset) % len)];
if (server.ServerType == serverType && server.IsSelectable(command)) if (server != null && server.ServerType == serverType && server.IsSelectable(command))
{ {
if (server.IsSlave) if (server.IsSlave)
{ {
...@@ -972,7 +972,7 @@ internal static void TraceWithoutContext(bool condition, string message, [System ...@@ -972,7 +972,7 @@ internal static void TraceWithoutContext(bool condition, string message, [System
string activeConfigCause; string activeConfigCause;
internal void ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cause) internal bool ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cause, bool publishReconfigure = false, CommandFlags flags = CommandFlags.None)
{ {
if (fromBroadcast) if (fromBroadcast)
{ {
...@@ -981,12 +981,14 @@ internal void ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau ...@@ -981,12 +981,14 @@ internal void ReconfigureIfNeeded(EndPoint blame, bool fromBroadcast, string cau
string activeCause = Interlocked.CompareExchange(ref activeConfigCause, null, null); string activeCause = Interlocked.CompareExchange(ref activeConfigCause, null, null);
if (activeCause == null) if (activeCause == null)
{ {
bool reconfigureAll = fromBroadcast; bool reconfigureAll = fromBroadcast || publishReconfigure;
Trace("Configuration change detected; checking nodes", "Configuration"); Trace("Configuration change detected; checking nodes", "Configuration");
ReconfigureAsync(false, reconfigureAll, null, blame, cause).ObserveErrors(); ReconfigureAsync(false, reconfigureAll, null, blame, cause, publishReconfigure, flags).ObserveErrors();
return true;
} else } else
{ {
Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration"); Trace("Configuration change skipped; already in progress via " + activeCause, "Configuration");
return false;
} }
} }
...@@ -1057,7 +1059,7 @@ public void GetStatus(TextWriter log) ...@@ -1057,7 +1059,7 @@ public void GetStatus(TextWriter log)
LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago", LogLocked(log, "Sync timeouts: {0}; fire and forget: {1}; last heartbeat: {2}s ago",
Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo); Interlocked.Read(ref syncTimeouts), Interlocked.Read(ref fireAndForgets), LastHeartbeatSecondsAgo);
} }
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause) internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, TextWriter log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{ {
if (isDisposed) throw new ObjectDisposedException(ToString()); if (isDisposed) throw new ObjectDisposedException(ToString());
//if connection failed treat it as first to honor retry logic. //if connection failed treat it as first to honor retry logic.
...@@ -1328,6 +1330,16 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text ...@@ -1328,6 +1330,16 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
LogLocked(log, "Starting heartbeat..."); LogLocked(log, "Starting heartbeat...");
pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat); pulse = new Timer(heartbeat, this, MillisecondsPerHeartbeat, MillisecondsPerHeartbeat);
} }
if(publishReconfigure)
{
try
{
LogLocked(log, "Broadcasting reconfigure...");
PublishReconfigureImpl(publishReconfigureFlags);
}
catch
{ }
}
return true; return true;
} catch (Exception ex) } catch (Exception ex)
...@@ -1817,12 +1829,24 @@ public void ResetStormLog() ...@@ -1817,12 +1829,24 @@ public void ResetStormLog()
/// <summary> /// <summary>
/// Request all compatible clients to reconfigure or reconnect /// Request all compatible clients to reconfigure or reconnect
/// </summary> /// </summary>
/// <returns>The number of instances known to have received the message (however, the actual number can be higher)</returns> /// <returns>The number of instances known to have received the message (however, the actual number can be higher; returns -1 if the operation is pending)</returns>
public long PublishReconfigure(CommandFlags flags = CommandFlags.None) public long PublishReconfigure(CommandFlags flags = CommandFlags.None)
{ {
byte[] channel = ConfigurationChangedChannel; byte[] channel = ConfigurationChangedChannel;
if (channel == null) return 0; if (channel == null) return 0;
if (ReconfigureIfNeeded(null, false, "PublishReconfigure", true, flags))
{
return -1;
}
else
{
return PublishReconfigureImpl(flags);
}
}
private long PublishReconfigureImpl(CommandFlags flags)
{
byte[] channel = ConfigurationChangedChannel;
if (channel == null) return 0;
return GetSubscriber().Publish(channel, RedisLiterals.Wildcard, flags); return GetSubscriber().Publish(channel, RedisLiterals.Wildcard, flags);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment