Commit 9a0f258e authored by Marc Gravell's avatar Marc Gravell

RedisValue internals tweaked; writing complete; read not started

parent 4f182469
using System;
using System.Collections.Generic;
using System.Globalization;
using Xunit;
......@@ -64,6 +65,13 @@ public void TestValues()
private void CheckSame(RedisValue x, RedisValue y)
{
Assert.True(Equals(x, y));
Assert.True(Equals(y, x));
Assert.True(EqualityComparer<RedisValue>.Default.Equals(x, y));
Assert.True(EqualityComparer<RedisValue>.Default.Equals(y, x));
Assert.True(x == y);
Assert.True(y == x);
Assert.False(x != y);
Assert.False(y != x);
Assert.True(x.Equals(y));
Assert.True(y.Equals(x));
Assert.True(x.GetHashCode() == y.GetHashCode());
......@@ -72,6 +80,13 @@ private void CheckSame(RedisValue x, RedisValue y)
private void CheckNotSame(RedisValue x, RedisValue y)
{
Assert.False(Equals(x, y));
Assert.False(Equals(y, x));
Assert.False(EqualityComparer<RedisValue>.Default.Equals(x, y));
Assert.False(EqualityComparer<RedisValue>.Default.Equals(y, x));
Assert.False(x == y);
Assert.False(y == x);
Assert.True(x != y);
Assert.True(y != x);
Assert.False(x.Equals(y));
Assert.False(y.Equals(x));
Assert.False(x.GetHashCode() == y.GetHashCode()); // well, very unlikely
......@@ -107,9 +122,9 @@ private void CheckNull(RedisValue value)
Assert.Equal(0L, (long)value);
CheckSame(value, value);
CheckSame(value, default(RedisValue));
CheckSame(value, (string)null);
CheckSame(value, (byte[])null);
//CheckSame(value, default(RedisValue));
//CheckSame(value, (string)null);
//CheckSame(value, (byte[])null);
}
[Fact]
......
......@@ -11,6 +11,8 @@
<SignAssembly>true</SignAssembly>
<PublicSign Condition=" '$(OS)' != 'Windows_NT' ">true</PublicSign>
<LangVersion>latest</LangVersion>
<!--<DefineConstants>$(DefineConstants);LOGOUTPUT</DefineConstants>-->
</PropertyGroup>
<ItemGroup>
......
using System;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
......@@ -324,28 +326,32 @@ partial class ConnectionMultiplexer
partial class PhysicalConnection
{
private Stream echo;
partial void OnCreateEcho()
//private Stream echo;
//partial void OnCreateEcho()
//{
// if (!string.IsNullOrEmpty(ConnectionMultiplexer.EchoPath))
// {
// string fullPath = Path.Combine(ConnectionMultiplexer.EchoPath,
// Regex.Replace(physicalName, @"[\-\.\@\#\:]", "_"));
// echo = File.Open(Path.ChangeExtension(fullPath, "txt"), FileMode.Create, FileAccess.Write, FileShare.ReadWrite);
// }
//}
//partial void OnCloseEcho()
//{
// if (echo != null)
// {
// try { echo.Close(); } catch { }
// try { echo.Dispose(); } catch { }
// echo = null;
// }
//}
partial void OnWrapForLogging(ref IDuplexPipe pipe, string name)
{
if (!string.IsNullOrEmpty(ConnectionMultiplexer.EchoPath))
foreach(var c in Path.GetInvalidFileNameChars())
{
string fullPath = Path.Combine(ConnectionMultiplexer.EchoPath,
Regex.Replace(physicalName, @"[\-\.\@\#\:]", "_"));
echo = File.Open(Path.ChangeExtension(fullPath, "txt"), FileMode.Create, FileAccess.Write, FileShare.ReadWrite);
name = name.Replace(c, '_');
}
}
partial void OnCloseEcho()
{
if (echo != null)
{
try { echo.Close(); } catch { }
try { echo.Dispose(); } catch { }
echo = null;
}
}
partial void OnWrapForLogging(ref Stream stream, string name)
{
stream = new LoggingTextStream(stream, physicalName, echo);
pipe = new LoggingPipe(pipe, $"{name}.in", $"{name}.out");
}
}
#endif
......
namespace StackExchange.Redis
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
namespace StackExchange.Redis
{
#if LOGOUTPUT
sealed class LoggingTextStream : Stream
sealed class LoggingPipe : IDuplexPipe
{
[Conditional("VERBOSE")]
void Trace(string value, [CallerMemberName] string caller = null)
{
Debug.WriteLine(value, this.category + ":" + caller);
}
[Conditional("VERBOSE")]
void Trace(char value, [CallerMemberName] string caller = null)
{
Debug.WriteLine(value, this.category + ":" + caller);
}
private IDuplexPipe _inner;
private readonly Stream stream, echo;
private readonly string category;
public LoggingTextStream(Stream stream, string category, Stream echo)
public LoggingPipe(IDuplexPipe inner, string inPath, string outPath)
{
if (stream == null) throw new ArgumentNullException("stream");
if (string.IsNullOrWhiteSpace(category)) category = GetType().Name;
this.stream = stream;
this.category = category;
this.echo = echo;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
asyncBuffer = buffer;
asyncOffset = offset;
asyncCount = count;
return stream.BeginRead(buffer, offset, count, callback, state);
}
private volatile byte[] asyncBuffer;
private volatile int asyncOffset, asyncCount;
public override int EndRead(IAsyncResult asyncResult)
{
int bytes = stream.EndRead(asyncResult);
if (bytes <= 0)
_inner = inner;
if (string.IsNullOrWhiteSpace(inPath))
{
Trace("<EOF>");
Input = inner.Input;
}
else
{
Trace(Encoding.UTF8.GetString(asyncBuffer, asyncOffset, asyncCount));
}
return bytes;
}
public override bool CanRead { get { return stream.CanRead; } }
public override bool CanSeek { get { return stream.CanSeek; } }
public override bool CanWrite { get { return stream.CanWrite; } }
public override bool CanTimeout { get { return stream.CanTimeout; } }
public override long Length { get { return stream.Length; } }
public override long Position
{
get { return stream.Position; }
set { stream.Position = value; }
}
public override int ReadTimeout
{
get { return stream.ReadTimeout; }
set { stream.ReadTimeout = value; }
}
public override int WriteTimeout
{
get { return stream.WriteTimeout; }
set { stream.WriteTimeout = value; }
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
stream.Dispose();
if (echo != null) echo.Flush();
}
base.Dispose(disposing);
}
public override void Close()
{
Trace("Close");
stream.Close();
if (echo != null) echo.Close();
base.Close();
}
public override void Flush()
{
Trace("Flush");
stream.Flush();
if (echo != null) echo.Flush();
}
public override long Seek(long offset, SeekOrigin origin)
{
return stream.Seek(offset, origin);
}
public override void SetLength(long value)
{
stream.SetLength(value);
}
public override void WriteByte(byte value)
{
Trace((char)value);
stream.WriteByte(value);
if (echo != null) echo.WriteByte(value);
}
public override int ReadByte()
{
int value = stream.ReadByte();
if(value < 0)
{
Trace("<EOF>");
} else
{
Trace((char)value);
var pipe = new Pipe();
Input = pipe.Reader;
CloneAsync(inPath, inner.Input, pipe.Writer);
}
return value;
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytes = stream.Read(buffer, offset, count);
if(bytes <= 0)
if (string.IsNullOrWhiteSpace(outPath))
{
Trace("<EOF>");
Output = inner.Output;
}
else
{
Trace(Encoding.UTF8.GetString(buffer, offset, bytes));
var pipe = new Pipe();
Output = pipe.Writer;
CloneAsync(outPath, pipe.Reader, inner.Output);
}
return bytes;
}
public override void Write(byte[] buffer, int offset, int count)
private async void CloneAsync(string path, PipeReader from, PipeWriter to)
{
if (count != 0)
to.OnReaderCompleted((ex, o) => ((PipeReader)o).Complete(ex), from);
from.OnWriterCompleted((ex, o) => ((PipeWriter)o).Complete(ex), to);
while(true)
{
Trace(Encoding.UTF8.GetString(buffer, offset, count));
var result = await from.ReadAsync();
var buffer = result.Buffer;
if (result.IsCompleted && buffer.IsEmpty) break;
using (var file = new FileStream(path, FileMode.Append, FileAccess.Write))
{
foreach (var segment in buffer)
{
// append it to the file
bool leased = false;
if (!MemoryMarshal.TryGetArray(segment, out var arr))
{
var tmp = ArrayPool<byte>.Shared.Rent(segment.Length);
segment.CopyTo(tmp);
arr = new ArraySegment<byte>(tmp, 0, segment.Length);
leased = true;
}
await file.WriteAsync(arr.Array, arr.Offset, arr.Count);
await file.FlushAsync();
if (leased) ArrayPool<byte>.Shared.Return(arr.Array);
// and flush it upstream
await to.WriteAsync(segment);
}
}
from.AdvanceTo(buffer.End);
}
stream.Write(buffer, offset, count);
if (echo != null) echo.Write(buffer, offset, count);
}
public PipeReader Input { get; }
public PipeWriter Output { get; }
}
#endif
}
......@@ -421,13 +421,24 @@ internal void Write(RedisChannel channel)
internal void Write(RedisValue value)
{
if (value.IsInteger)
switch(value.Type)
{
WriteUnified(_ioPipe.Output, (long)value);
}
else
{
WriteUnified(_ioPipe.Output, (byte[])value);
case RedisValue.StorageType.Null:
WriteUnified(_ioPipe.Output, (byte[])null);
break;
case RedisValue.StorageType.Int64:
WriteUnified(_ioPipe.Output, (long)value);
break;
case RedisValue.StorageType.Double: // use string
case RedisValue.StorageType.String:
WriteUnified(_ioPipe.Output, null, (string)value);
break;
case RedisValue.StorageType.Raw:
WriteUnified(_ioPipe.Output, ((ReadOnlyMemory<byte>)value).Span);
break;
default:
throw new InvalidOperationException($"Unexpected {value.Type} value: '{value}'");
}
}
......@@ -492,7 +503,7 @@ static void WriteCrlf(PipeWriter writer)
span[1] = (byte)'\n';
writer.Advance(2);
}
private static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix = false, int offset = 0)
internal static int WriteRaw(Span<byte> span, long value, bool withLengthPrefix = false, int offset = 0)
{
if (value >= 0 && value <= 9)
{
......@@ -594,16 +605,24 @@ internal void WakeWriterAndCheckForThrottle()
static readonly byte[] NullBulkString = Encoding.ASCII.GetBytes("$-1\r\n"), EmptyBulkString = Encoding.ASCII.GetBytes("$0\r\n\r\n");
private static void WriteUnified(PipeWriter writer, byte[] value)
{
const int MaxQuickSpanSize = 512;
// ${len}\r\n = 3 + MaxInt32TextLen
// {value}\r\n = 2 + value.Length
if (value == null)
{
// special case:
writer.Write(NullBulkString);
}
else if (value.Length == 0)
else
{
WriteUnified(writer, new ReadOnlySpan<byte>(value));
}
}
private static void WriteUnified(PipeWriter writer, ReadOnlySpan<byte> value)
{
// ${len}\r\n = 3 + MaxInt32TextLen
// {value}\r\n = 2 + value.Length
const int MaxQuickSpanSize = 512;
if (value.Length == 0)
{
// special case:
writer.Write(EmptyBulkString);
......@@ -611,7 +630,8 @@ private static void WriteUnified(PipeWriter writer, byte[] value)
else if (value.Length <= MaxQuickSpanSize)
{
var span = writer.GetSpan(5 + MaxInt32TextLen + value.Length);
int bytes = WriteUnified(span, value);
span[0] = (byte)'$';
int bytes = WriteUnified(span, value, 1);
writer.Advance(bytes);
}
else
......@@ -619,7 +639,7 @@ private static void WriteUnified(PipeWriter writer, byte[] value)
// too big to guarantee can do in a single span
var span = writer.GetSpan(3 + MaxInt32TextLen);
span[0] = (byte)'$';
int bytes = WriteRaw(span, value.LongLength, offset: 1);
int bytes = WriteRaw(span, value.Length, offset: 1);
writer.Advance(bytes);
writer.Write(value);
......@@ -636,13 +656,18 @@ private static int WriteUnified(Span<byte> span, byte[] value, int offset = 0)
}
else
{
offset = WriteRaw(span, value.Length, offset: offset);
new ReadOnlySpan<byte>(value).CopyTo(span.Slice(offset, value.Length));
offset += value.Length;
offset = WriteCrlf(span, offset);
offset = WriteUnified(span, new ReadOnlySpan<byte>(value), offset);
}
return offset;
}
private static int WriteUnified(Span<byte> span, ReadOnlySpan<byte> value, int offset = 0)
{
offset = WriteRaw(span, value.Length, offset: offset);
value.CopyTo(span.Slice(offset, value.Length));
offset += value.Length;
offset = WriteCrlf(span, offset);
return offset;
}
internal void WriteSha1AsHex(byte[] value)
{
......
......@@ -2484,7 +2484,9 @@ internal override void WriteImpl(PhysicalConnection physical)
}
else
{ // recognises well-known types
physical.Write(RedisValue.Parse(arg));
var val = RedisValue.TryParse(arg);
if (val.IsNull && arg != null) throw new InvalidCastException($"Unable to parse value: '{arg}'");
physical.Write(val);
}
}
}
......
......@@ -9,9 +9,6 @@ class Program
{
static int Main()
{
var options = PipeOptions.Default;
Console.WriteLine(options.PauseWriterThreshold);
Console.WriteLine(options.ResumeWriterThreshold);
var s = new StringWriter();
try
{
......@@ -38,8 +35,8 @@ static int Main()
}
finally
{
// Console.WriteLine();
// Console.WriteLine(s);
Console.WriteLine();
//Console.WriteLine(s);
}
}
......
......@@ -6,13 +6,17 @@
<LangVersion>latest</LangVersion>
</PropertyGroup>
<PropertyGroup Condition="'$(Computername)'=='OCHO' or '$(Computername)'=='SKINK'">
<LocalReference>true</LocalReference>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\StackExchange.Redis\StackExchange.Redis.csproj" />
</ItemGroup>
<ItemGroup Condition="'$(Computername)'=='OCHO'">
<ItemGroup Condition="'$(LocalReference)'=='true'">
<ProjectReference Include="..\..\Pipelines.Sockets.Unofficial\src\Pipelines.Sockets.Unofficial\Pipelines.Sockets.Unofficial.csproj" />
</ItemGroup>
<ItemGroup Condition="'$(Computername)'!='OCHO'">
<ItemGroup Condition="'$(LocalReference)'!='true'">
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="0.2.1-alpha.48" />
</ItemGroup>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment