Saturday, February 25, 2012

TPL aware read/write streams for WCF streamed responses

I've recently needed to employ the WCF streamed response approach for a project. A caveat applied however - constructing the data to be streamed for a single request was relatively slow, and had the potential to be of sufficient magnitude to compromise the available physical memory of any one server in a cluster. And given that multiple requests could be 'in flight', a more reserved implementation seemed apposite.

An obvious mechanism became apparent. If the stream returned from the WCF service call was, server side, a read/write stream, I could connect the producer of data with the consumer (WCF framework, ultimately connected to the requesting client) and utilize standard synchronization primitives to control behaviour. Meaning, of course, we attempt to provide data to the client as soon as it is available and allow for 'arbitrary' amounts of data to be dispatched.

This translates to the WCF service call returning a System.IO.Stream, as expected, with the actual type being an implementation of (therefore) System.IO.Stream - my Read/Write stream.

Given the proposition that the producer of data to be consumed is 'slow' (where that means, some latency other than epsilon milliseconds), I'd need to allow for the consumer to be signalled that data was available - or  it would expend time spinning.

So, create a read/write stream, controlled by an auto reset event, that both the consumer and producer use. In this instance, the consumer is WCF, the producer is some particular task that is associated with the stream wrapper (an Action in fact). The wrapper, on calling Open(), shall return a Stream object, that is returned from the WCF call to the client, and is asynchronously written to, and responds as necessary to WCF demands (typically, Read(,,,)).  In this case, Read() is dependent on (see implementation) an AutoResetEvent.

It seems appropriate to commence with the stream implementation. It makes concrete the abstract class System.IO.Stream, and delegates read and write actions to a specialized 'buffer' type object, which uses the AutoResetEvent to ensure that read and write events occur as and when necessary (or reasonable). Most of this is filler, to properly implement the abstract class, the key methods being Read/Write.

1:    public class ReadWriteStream : Stream {  
2:      private bool mComplete;  
3:      public ReadWriteStream() : this(0) {  
4:      }  
5:      public ReadWriteStream(int syncBufferCapacity) {  
6:        SyncBuffer = new SynchronizedBuffer(syncBufferCapacity);  
7:      }  
8:      public bool Complete {  
9:        get {  
10:          return mComplete;  
11:        }  
12:        set {  
13:          mComplete = SyncBuffer.Complete = value;  
14:        }  
15:      }  
16:      public override bool CanRead {  
17:        get { return true; }  
18:      }  
19:      public override bool CanSeek {  
20:        get { return false; }  
21:      }  
22:      public override bool CanWrite {  
23:        get { return false; }  
24:      }  
25:      public override void Flush() {  
26:      }  
27:      public override long Length {  
28:        get {  
29:          return DBC.AssertNotImplemented<long>("This stream does not support the Length property.");  
30:        }  
31:      }  
32:      public override long Position {  
33:        get {  
34:          return DBC.AssertNotImplemented<long>("This stream does not support getting the Position property.");  
35:        }  
36:        set {  
37:          DBC.AssertNotImplemented<long>("This stream does not support setting the Position property.");  
38:        }  
39:      }  
40:      public override int Read(byte[] buffer, int offset, int count) {  
41:        return SyncBuffer.Read(buffer, offset, count);  
42:      }  
43:      public override long Seek(long offset, SeekOrigin origin) {  
44:        return DBC.AssertNotImplemented<long>("This stream does not support seeking");  
45:      }  
46:      public override void SetLength(long value) {  
47:        DBC.AssertNotImplemented<int>("This stream does not support setting the Length.");  
48:      }  
49:      public override void Write(byte[] buffer, int offset, int count) {  
50:        SyncBuffer.Write(buffer, offset, count);  
51:      }  
52:      public override void Close() {  
53:        if (!SyncBuffer.ContentAvailable && SyncBuffer.Complete)  
54:          SyncBuffer.Close();  
55:      }  
56:      private SynchronizedBuffer SyncBuffer { get; set; }  
57:    }  

Well, that's a lot of content, but the only interesting parts are the construction, Close() and Read(..) and Write(..). And Read/Write both use the SynchronizedBuffer concrete type - this is thus shown below:

1:   public class SynchronizedBuffer {  
2:    
3:    // TODO: This needs to be derived from the binding of the service calling this - can't find a way yet  
4:    private const int DefaultTimeOut = 15000;  
5:    private readonly AutoResetEvent mBufferEvent = new AutoResetEvent(false);  
6:    private bool mComplete;  
7:    
8:    internal SynchronizedBuffer(int syncBufferCapacity) {  
9:     TimeOut = DefaultTimeOut;  
10:     InitialBufferCapacity = syncBufferCapacity;  
11:     Buffer = new MemoryStream(syncBufferCapacity);  
12:    }  
13:    
14:    private MemoryStream Buffer { get; set; }  
15:    
16:    private int InitialBufferCapacity { get; set; }  
17:    
18:    internal int Read(byte[] buffer, int offset, int count) {  
19:       
20:     if (!Complete && !ContentAvailable &&  
21:      !mBufferEvent.WaitOne(TimeOut)) {  
22:       LogFacade.LogError(this, "Going to abend on auto reset event after timeout");  
23:       throw new ApplicationException("Timed out waiting for auto reset event");  
24:     }  
25:    
26:     int cnt;  
27:     lock (Buffer) {  
28:      cnt = Buffer.Read(buffer, offset, count);  
29:      if (Buffer.Length > InitialBufferCapacity)  
30:       Resize();  
31:     }  
32:     return cnt;  
33:    }  
34:    
35:    internal void Write(byte[] buffer, int offset, int count) {  
36:     lock (Buffer) {  
37:      long currentReadPosition = Buffer.Position;  
38:      Buffer.Seek(0, SeekOrigin.End);  
39:      Buffer.Write(buffer, offset, count);  
40:      Buffer.Seek(currentReadPosition, SeekOrigin.Begin);  
41:      mBufferEvent.Set();  
42:     }  
43:    }  
44:    
45:    private void Resize() {  
46:     long currentPosition = Buffer.Position;  
47:     long unread = Buffer.Length - currentPosition;  
48:     if (unread <= 0L)  
49:      unread = 0L;  
50:     else {  
51:      byte[] slice = new byte[unread];  
52:      Buffer.Read(slice, 0, (int)unread);  
53:      Buffer.Seek(0L, SeekOrigin.Begin);  
54:      Buffer.Write(slice, 0, (int)unread);  
55:      Buffer.Seek(0L, SeekOrigin.Begin);  
56:     }  
57:     Buffer.SetLength(unread);  
58:    }  
59:    
60:    internal int TimeOut { get; set; }  
61:    
62:    internal bool Complete {   
63:     get {  
64:      return mComplete;  
65:     }  
66:     set {  
67:      mComplete = value;  
68:      if (mComplete)   
69:       mBufferEvent.Set();  
70:     }  
71:    }  
72:    
73:    internal bool ContentAvailable {  
74:     get {  
75:      lock (Buffer) {  
76:       return Buffer.Length - Buffer.Position > 0;  
77:      }  
78:     }  
79:    }  
80:    
81:    internal void Close() {  
82:     if (!ContentAvailable) {  
83:      Buffer.Dispose();  
84:      Buffer = null;  
85:     }  
86:    }  
87:   }  
88:    

This very concrete type uses a memory stream to support read/write operations. The auto reset event is signalled by the write operation, and rather obviously, waited on by the read operation. A simple 'lock' statement preserves thread safety. The only point of interest is that of course multiple writes may occur before a read executes - which means that the memory stream is more difficult to control in terms of capacity. In the case of slowly consuming clients, this would need further thought - the current implementation uses a simplistic 'preserve unread' strategy.

Now we need to allow this read/write stream to be used by some arbitrary producer of data - our consumer is known to us, being the WCF infrastructure. So I defined a 'wrapper' class, which could do with renaming I admit, that accepts an Action that will be spun off in a TPL task - writing to the stream that the wrapper, well, wraps.

1:   public class ReadWriteStreamWrapper {  
2:    
3:    public ReadWriteStreamWrapper(Action<object> producer = null) {  
4:     Producer = producer;  
5:    }  
6:    
7:    public ReadWriteStreamWrapper SetSyncCapacity(int capacity = 0) {  
8:     BufferCapacity = capacity;  
9:     return this;  
10:    }  
11:    
12:    private int BufferCapacity { get; set; }  
13:    
14:    public ReadWriteStream Open() {  
15:     Stream = new ReadWriteStream(BufferCapacity);  
16:     AssociatedTask = Task.Factory.StartNew(Producer, Stream);  
17:     return Stream;  
18:    }  
19:     
20:    public ReadWriteStream Open<T>() where T : SinkWriter, new() {  
21:     Producer = sink => {  
22:            T writer = new T {   
23:               Sink = sink as ReadWriteStream   
24:            };  
25:            writer.Process();  
26:           };  
27:     return Open();  
28:    }   
29:    
30:    private Action<object> Producer { get; set; }  
31:    
32:    private ReadWriteStream Stream { get; set; }  
33:    
34:    public Task AssociatedTask { get; private set; }  
35:   }  

Points of note:

  • Lines 7-10: Fluent style method that sets the capacity of the underlying stream if desired
  • Lines 14-18: The 'business' so  to speak.Create a ReadWriteStream, create a task with the identified producer Action and supply the stream as the state passed to the Action. Finally, return the stream object, which will be the 'return' value for WCF
A WCF operation implementation would thus create a wrapper object, supply a producer Action, and return the object returned by Open(). When WCF attempts to read from the stream (our read/write stream) it will wait on the auto reset event, and one would hope that before the wait times out, the associated producer (running in a TPL task), actually writes some data to the stream - which sets the auto reset event, allowing a waiting read to proceed. All very simple in actuality.

You might notice a reference to a 'SinkWriter' - this is an abstract convenience class (note to self: review semantics of utterances - makes it appear as if the 'convenience' is abstract, rather than the class!). It's shown below for completeness:

1:  public abstract class SinkWriter {  
2:    
3:    public ReadWriteStream Sink { get; set; }  
4:    
5:    public void Process() {  
6:     try {  
7:      Execute();  
8:     }  
9:     catch (Exception ex) {  
10:      LogFacade.LogError(this, "Sink Writer failure", ex);  
11:     }  
12:     finally {  
13:      Sink.Complete = true;  
14:     }  
15:    }  
16:    
17:    protected abstract void Execute();  
18:    
19:   }  


Here is a pro-forma WCF operation implementation to illustrate tying it all together:

1:  public Stream CreateStreamedResponse(Request req) {  
2:     return new ReadWriteStreamWrapper(sink => {  
3:      IDocumentGenerator gen =   
4:       DocumentGenerationFacade.CreateGenerator(req);  
5:      new DelegatingSinkWriter(gen, req) {  
6:       Sink = sink as ReadWriteStream  
7:      } .Process();  
8:     })  
9:      .SetSyncCapacity(ServiceConstants.SynchronizedBufferSize)  
10:     .Open();  
11:    }  

We define the operation using the form expected for a streamed response; that is, by returning the stream object directly from the call. The wrapper takes some implementation of a sink writer (it doesn't matter awfully what that is for the sake of this blog), which, as discussed, accepts a read/write stream that it will write to, with WCF acting in the role of consumer. We set the capacity according to some configuration, and then call Open, which will create the RW stream, start the producer task - and let matters proceed.

One minor point - configuration. You need to ensure the binding for the WCF server changes the transfer mode - my simple example below:

1:  <bindings>  
2:     <basicHttpBinding>  
3:      <binding name="StreamBinding"   
4:                       transferMode="StreamedResponse"   
5:                       sendTimeout="00:00:15"   
6:                       maxBufferSize="2048"/>  
7:     </basicHttpBinding>  

This implementation has been 'battle' tested, and holds up well under pressure. And what is missing? If anyone reads this, it will be obvious! Care to really control a memory stream?

No comments: