Showing posts with label Architecture. Show all posts
Showing posts with label Architecture. Show all posts

Thursday, July 30, 2020

Azure Cosmos DB - Partition keys


General
Just come out of a gig where Cosmos DB was the persistence engine of choice, using the SQL API. If you don't know much about Cosmos, see here.

Partition keys
One of the architectural decisions made by Microsoft confuses me. They have a notion of a logical partition, which can have a maximum size of 10GB. It is expected that your Cosmos DB usage, assuming non trivial, has to arrange for objects/documents to be partitioned across multiple logical containers.

Therein lies the rub. Cosmos DB won't do any of this partitioning for you, it is entirely up to you to arrive at some satisfactory scheme, which involves your implementation generating a partition key that potentially reflects some guidelines that Microsoft share.

For the domain I was in, a number of external organisations submitted many documents to the client, and these submissions would be distributed over a number of years, and easily exceed the 10GB logical partition limit. One of the key guidelines from Microsoft is to avoid 'hot partitions' - that is, a partition that gets used heavily to the exclusion of almost any other. This has quite serious performance implications.

So, given we don't want hot partitions, that rules out using a partition key that uses the year for any submission, as there is a strong locality of reference in play - that is, the external organisations tend to focus on the most recent temporal activity and hence Cosmos action would tend to focus on one partition for a year!

In the end, knowing that each external organisation had a unique 'organisation number', and using a sequence/modulo scheme, an effective partitioning approach was implemented. It's operation is simple, and works as below:

  • An external organisation submits a JSON document via a REST API
  • On receipt, a Cosmos stored document is found or created based on the organisation number
  • This document has a sequence number and a modulo. We calculate sequence mod modulo.
  • We increment the sequence, and save the organisation specific document
  • We now have a pro forma partition key, for organisation 7633, we might have: 7633-1, 7633-2 and so on

What this provides is for bounded yet not meaningfully limited partition counts. By judicious selection of modulo (in the case of my client, this was an integer), scalability is "assured". 

Saturday, February 25, 2012

TPL aware read/write streams for WCF streamed responses

I've recently needed to employ the WCF streamed response approach for a project. A caveat applied however - constructing the data to be streamed for a single request was relatively slow, and had the potential to be of sufficient magnitude to compromise the available physical memory of any one server in a cluster. And given that multiple requests could be 'in flight', a more reserved implementation seemed apposite.

An obvious mechanism became apparent. If the stream returned from the WCF service call was, server side, a read/write stream, I could connect the producer of data with the consumer (WCF framework, ultimately connected to the requesting client) and utilize standard synchronization primitives to control behaviour. Meaning, of course, we attempt to provide data to the client as soon as it is available and allow for 'arbitrary' amounts of data to be dispatched.

This translates to the WCF service call returning a System.IO.Stream, as expected, with the actual type being an implementation of (therefore) System.IO.Stream - my Read/Write stream.

Given the proposition that the producer of data to be consumed is 'slow' (where that means, some latency other than epsilon milliseconds), I'd need to allow for the consumer to be signalled that data was available - or  it would expend time spinning.

So, create a read/write stream, controlled by an auto reset event, that both the consumer and producer use. In this instance, the consumer is WCF, the producer is some particular task that is associated with the stream wrapper (an Action in fact). The wrapper, on calling Open(), shall return a Stream object, that is returned from the WCF call to the client, and is asynchronously written to, and responds as necessary to WCF demands (typically, Read(,,,)).  In this case, Read() is dependent on (see implementation) an AutoResetEvent.

It seems appropriate to commence with the stream implementation. It makes concrete the abstract class System.IO.Stream, and delegates read and write actions to a specialized 'buffer' type object, which uses the AutoResetEvent to ensure that read and write events occur as and when necessary (or reasonable). Most of this is filler, to properly implement the abstract class, the key methods being Read/Write.

1:    public class ReadWriteStream : Stream {  
2:      private bool mComplete;  
3:      public ReadWriteStream() : this(0) {  
4:      }  
5:      public ReadWriteStream(int syncBufferCapacity) {  
6:        SyncBuffer = new SynchronizedBuffer(syncBufferCapacity);  
7:      }  
8:      public bool Complete {  
9:        get {  
10:          return mComplete;  
11:        }  
12:        set {  
13:          mComplete = SyncBuffer.Complete = value;  
14:        }  
15:      }  
16:      public override bool CanRead {  
17:        get { return true; }  
18:      }  
19:      public override bool CanSeek {  
20:        get { return false; }  
21:      }  
22:      public override bool CanWrite {  
23:        get { return false; }  
24:      }  
25:      public override void Flush() {  
26:      }  
27:      public override long Length {  
28:        get {  
29:          return DBC.AssertNotImplemented<long>("This stream does not support the Length property.");  
30:        }  
31:      }  
32:      public override long Position {  
33:        get {  
34:          return DBC.AssertNotImplemented<long>("This stream does not support getting the Position property.");  
35:        }  
36:        set {  
37:          DBC.AssertNotImplemented<long>("This stream does not support setting the Position property.");  
38:        }  
39:      }  
40:      public override int Read(byte[] buffer, int offset, int count) {  
41:        return SyncBuffer.Read(buffer, offset, count);  
42:      }  
43:      public override long Seek(long offset, SeekOrigin origin) {  
44:        return DBC.AssertNotImplemented<long>("This stream does not support seeking");  
45:      }  
46:      public override void SetLength(long value) {  
47:        DBC.AssertNotImplemented<int>("This stream does not support setting the Length.");  
48:      }  
49:      public override void Write(byte[] buffer, int offset, int count) {  
50:        SyncBuffer.Write(buffer, offset, count);  
51:      }  
52:      public override void Close() {  
53:        if (!SyncBuffer.ContentAvailable && SyncBuffer.Complete)  
54:          SyncBuffer.Close();  
55:      }  
56:      private SynchronizedBuffer SyncBuffer { get; set; }  
57:    }  

Well, that's a lot of content, but the only interesting parts are the construction, Close() and Read(..) and Write(..). And Read/Write both use the SynchronizedBuffer concrete type - this is thus shown below:

1:   public class SynchronizedBuffer {  
2:    
3:    // TODO: This needs to be derived from the binding of the service calling this - can't find a way yet  
4:    private const int DefaultTimeOut = 15000;  
5:    private readonly AutoResetEvent mBufferEvent = new AutoResetEvent(false);  
6:    private bool mComplete;  
7:    
8:    internal SynchronizedBuffer(int syncBufferCapacity) {  
9:     TimeOut = DefaultTimeOut;  
10:     InitialBufferCapacity = syncBufferCapacity;  
11:     Buffer = new MemoryStream(syncBufferCapacity);  
12:    }  
13:    
14:    private MemoryStream Buffer { get; set; }  
15:    
16:    private int InitialBufferCapacity { get; set; }  
17:    
18:    internal int Read(byte[] buffer, int offset, int count) {  
19:       
20:     if (!Complete && !ContentAvailable &&  
21:      !mBufferEvent.WaitOne(TimeOut)) {  
22:       LogFacade.LogError(this, "Going to abend on auto reset event after timeout");  
23:       throw new ApplicationException("Timed out waiting for auto reset event");  
24:     }  
25:    
26:     int cnt;  
27:     lock (Buffer) {  
28:      cnt = Buffer.Read(buffer, offset, count);  
29:      if (Buffer.Length > InitialBufferCapacity)  
30:       Resize();  
31:     }  
32:     return cnt;  
33:    }  
34:    
35:    internal void Write(byte[] buffer, int offset, int count) {  
36:     lock (Buffer) {  
37:      long currentReadPosition = Buffer.Position;  
38:      Buffer.Seek(0, SeekOrigin.End);  
39:      Buffer.Write(buffer, offset, count);  
40:      Buffer.Seek(currentReadPosition, SeekOrigin.Begin);  
41:      mBufferEvent.Set();  
42:     }  
43:    }  
44:    
45:    private void Resize() {  
46:     long currentPosition = Buffer.Position;  
47:     long unread = Buffer.Length - currentPosition;  
48:     if (unread <= 0L)  
49:      unread = 0L;  
50:     else {  
51:      byte[] slice = new byte[unread];  
52:      Buffer.Read(slice, 0, (int)unread);  
53:      Buffer.Seek(0L, SeekOrigin.Begin);  
54:      Buffer.Write(slice, 0, (int)unread);  
55:      Buffer.Seek(0L, SeekOrigin.Begin);  
56:     }  
57:     Buffer.SetLength(unread);  
58:    }  
59:    
60:    internal int TimeOut { get; set; }  
61:    
62:    internal bool Complete {   
63:     get {  
64:      return mComplete;  
65:     }  
66:     set {  
67:      mComplete = value;  
68:      if (mComplete)   
69:       mBufferEvent.Set();  
70:     }  
71:    }  
72:    
73:    internal bool ContentAvailable {  
74:     get {  
75:      lock (Buffer) {  
76:       return Buffer.Length - Buffer.Position > 0;  
77:      }  
78:     }  
79:    }  
80:    
81:    internal void Close() {  
82:     if (!ContentAvailable) {  
83:      Buffer.Dispose();  
84:      Buffer = null;  
85:     }  
86:    }  
87:   }  
88:    

This very concrete type uses a memory stream to support read/write operations. The auto reset event is signalled by the write operation, and rather obviously, waited on by the read operation. A simple 'lock' statement preserves thread safety. The only point of interest is that of course multiple writes may occur before a read executes - which means that the memory stream is more difficult to control in terms of capacity. In the case of slowly consuming clients, this would need further thought - the current implementation uses a simplistic 'preserve unread' strategy.

Now we need to allow this read/write stream to be used by some arbitrary producer of data - our consumer is known to us, being the WCF infrastructure. So I defined a 'wrapper' class, which could do with renaming I admit, that accepts an Action that will be spun off in a TPL task - writing to the stream that the wrapper, well, wraps.

1:   public class ReadWriteStreamWrapper {  
2:    
3:    public ReadWriteStreamWrapper(Action<object> producer = null) {  
4:     Producer = producer;  
5:    }  
6:    
7:    public ReadWriteStreamWrapper SetSyncCapacity(int capacity = 0) {  
8:     BufferCapacity = capacity;  
9:     return this;  
10:    }  
11:    
12:    private int BufferCapacity { get; set; }  
13:    
14:    public ReadWriteStream Open() {  
15:     Stream = new ReadWriteStream(BufferCapacity);  
16:     AssociatedTask = Task.Factory.StartNew(Producer, Stream);  
17:     return Stream;  
18:    }  
19:     
20:    public ReadWriteStream Open<T>() where T : SinkWriter, new() {  
21:     Producer = sink => {  
22:            T writer = new T {   
23:               Sink = sink as ReadWriteStream   
24:            };  
25:            writer.Process();  
26:           };  
27:     return Open();  
28:    }   
29:    
30:    private Action<object> Producer { get; set; }  
31:    
32:    private ReadWriteStream Stream { get; set; }  
33:    
34:    public Task AssociatedTask { get; private set; }  
35:   }  

Points of note:

  • Lines 7-10: Fluent style method that sets the capacity of the underlying stream if desired
  • Lines 14-18: The 'business' so  to speak.Create a ReadWriteStream, create a task with the identified producer Action and supply the stream as the state passed to the Action. Finally, return the stream object, which will be the 'return' value for WCF
A WCF operation implementation would thus create a wrapper object, supply a producer Action, and return the object returned by Open(). When WCF attempts to read from the stream (our read/write stream) it will wait on the auto reset event, and one would hope that before the wait times out, the associated producer (running in a TPL task), actually writes some data to the stream - which sets the auto reset event, allowing a waiting read to proceed. All very simple in actuality.

You might notice a reference to a 'SinkWriter' - this is an abstract convenience class (note to self: review semantics of utterances - makes it appear as if the 'convenience' is abstract, rather than the class!). It's shown below for completeness:

1:  public abstract class SinkWriter {  
2:    
3:    public ReadWriteStream Sink { get; set; }  
4:    
5:    public void Process() {  
6:     try {  
7:      Execute();  
8:     }  
9:     catch (Exception ex) {  
10:      LogFacade.LogError(this, "Sink Writer failure", ex);  
11:     }  
12:     finally {  
13:      Sink.Complete = true;  
14:     }  
15:    }  
16:    
17:    protected abstract void Execute();  
18:    
19:   }  


Here is a pro-forma WCF operation implementation to illustrate tying it all together:

1:  public Stream CreateStreamedResponse(Request req) {  
2:     return new ReadWriteStreamWrapper(sink => {  
3:      IDocumentGenerator gen =   
4:       DocumentGenerationFacade.CreateGenerator(req);  
5:      new DelegatingSinkWriter(gen, req) {  
6:       Sink = sink as ReadWriteStream  
7:      } .Process();  
8:     })  
9:      .SetSyncCapacity(ServiceConstants.SynchronizedBufferSize)  
10:     .Open();  
11:    }  

We define the operation using the form expected for a streamed response; that is, by returning the stream object directly from the call. The wrapper takes some implementation of a sink writer (it doesn't matter awfully what that is for the sake of this blog), which, as discussed, accepts a read/write stream that it will write to, with WCF acting in the role of consumer. We set the capacity according to some configuration, and then call Open, which will create the RW stream, start the producer task - and let matters proceed.

One minor point - configuration. You need to ensure the binding for the WCF server changes the transfer mode - my simple example below:

1:  <bindings>  
2:     <basicHttpBinding>  
3:      <binding name="StreamBinding"   
4:                       transferMode="StreamedResponse"   
5:                       sendTimeout="00:00:15"   
6:                       maxBufferSize="2048"/>  
7:     </basicHttpBinding>  

This implementation has been 'battle' tested, and holds up well under pressure. And what is missing? If anyone reads this, it will be obvious! Care to really control a memory stream?

Friday, August 19, 2011

Adaptive contracts for WCF

Or WCF in the raw! With my current project, I decided it would be useful to let clients request only the objects/data they will actually consume, rather than impose a specific data contract. From a solutions architecture perspective, a set of WCF services are publicly exposed for consumption, accepting and returning JSON (for reasons of brevity, given predicted limited bandwidth).

So how best to do this? Well, some research turned up this MSDN blog post....and WCF raw it shall be. I also examined OData, in depth, and I concluded that it was still immature for production use, and required too much effort to make it truly lean.

So what does a client 'see' or 'do'? From my perspective, a client should tell me what 'domain' of objects it is interested in, and potentially indicate what 'zone' is of concern. For the domain of objects, it should be possible to note the 'name' of the object property of interest, and how the client wants to refer to it. For example, I might be interested in a composite (navigated) property of x.y.z, but I want to refer to it as q. Again, this is a way of minimising bandwidth consumption. It also implies that the implied spectrum of navigation routes is well known.

All code formatted using this excellent tool: http://codeformatter.blogspot.com/2009/06/about-code-formatter.html

So an implementation uses an XML file to capture this information, an abbreviated example is shown next:

 <?xml version="1.0" encoding="utf-8" ?>  
 <Templates>  
   <TemplateCollection zone="smartphone">  
     <Template name="Accounts">  
       <Field key="Name"/>  
       <Field key="CustomisedName"/>  
       <Field key="Number"/>  
       <Field key="FinancialData.Balance.Value"   
                  returnKey="Balance"/>  
       <Field key="FinancialData.Balance"   
                   returnKey="FormattedBalance"/>  
     </Template>  
   </TemplateCollection>  
 </Templates>  

And here is a JSON request a client might make:

 {"BespokeRequest":  
   { "Specification":   
    { "Zone":"smartphone", "Name":"Accounts" }  
   }  
 }  

So it's quite obvious that 'zone' identifies a collection of domain templates - and this request is interested in the 'Accounts' domain (as shown in the XML excerpt). For the object type that comprises this domain, there are obviously a number of properties in existence, and the 'smartphone' variant template just uses what it needs. This excerpt shows the composite property mapping in use, taking a navigation specification and returning it as a simple property:

 <Field key="FinancialData.Balance.Value"   
        returnKey="Balance"/>  

So the client wants to reference FinancialData.Balance.Value as Balance. However, as the point of this exercise is to allow arbitrary specification, this is also supported, using a JSON request similar to the following:

 {"BespokeRequest":  
  { "Specification":   
   { "Name":"Accounts" },   
  {"Definitions": [  
   { "Key":"Name","Value":""},  
   { "Key":"Number","Value":""},  
   { "Key":"FinancialData.Balance.Value","Value":"Balance"},  
   { "Key":"CustomisedName","Value":""},  
   { "Key":"FinancialData.Balance.Value","Value":"FormattedBalance"}  
  ]  
  }  
 }  

If a 'Value' is null or empty, the value of 'Key' in the associated object will have a JSON name that is the same as the 'Key' string.

The rationale for the template approach is to minimise bandwidth consumption as far as possible for in house applications.

Of course we have to be able to represent this in a WCF service. For raw service methods, the return type is System.IO.Stream - which signals to the host container that the service implementation is handling the 'formatting' of the response. In my case, a number of services will be 'adaptive', so there is a mixin interface, as below:

   [ServiceContract]  
   public interface IAdaptiveService {  
     [WebInvoke(Method = "POST",   
          UriTemplate = "BespokeRequest",   
          ResponseFormat = WebMessageFormat.Json,  
          RequestFormat = WebMessageFormat.Json,   
          BodyStyle = WebMessageBodyStyle.Wrapped)]  
     [OperationContract]  
     [return: MessageParameter(Name = "BespokeResponse")]  
     Stream BespokeRequest(  
             [MessageParameter(Name = "BespokeRequest")]  
             BespokeObjectListRequest request);  
   }  

Technically, specifying the response format is unnecessary, as raw services have complete control of this aspect. The message parameter attribute is useful though, allowing a generic name to be used for the actual request being passed in.

And here are the related data contracts and supporting objects - noting that, where it makes sense, data members are allowed to be absent on deserialization - this again means fewer bytes being sent 'over the wire'.

   [DataContract(Namespace = "Integration")]  
   public class BespokeObjectListRequest {  
     [DataMember(IsRequired = true)]  
     public TemplateSpecification Specification { get; set; }  
     [DataMember(IsRequired = false)]  
     public List<FieldSet> Definitions { get; set; }  
   }  
   
   [DataContract(Namespace = "Integration")]  
   public class FieldSet {  
     [DataMember]  
     public string Key { get; set; }  
     [DataMember(IsRequired = false)]  
     public string Value { get; set; }  
   }  
   
   [DataContract(Namespace = "Integration")]  
   public class TemplateSpecification {  
     [DataMember]  
     public string Zone { get; set; }  
     [DataMember]  
     public string Name { get; set; }  
   
     public bool IsValid() {  
       return !String.IsNullOrWhiteSpace(Zone)   
                  && !String.IsNullOrWhiteSpace(Name);  
     }  
   }  

A base type provides the implementation of the IAdaptiveService interface, shown below. The implementation uses a variant of centralized exception handling, that I described in a previous post.

1:  public Stream BespokeRequest(BespokeObjectListRequest request) {  
2:   return RawExecute<MemoryStream>(GenericReponseName,  
3:    new WrappingProcessor(() => {  
4:      DBC.AssertNotNull(request, "Null request can't be interrogated");  
5:      DBC.AssertNotNull(request.Specification,   
6:        "Null request specification can't be interrogated");  
7:      DBC.Assert(request.Specification.IsValid(), "The request specification is invalid");  
8:      string targetMethod = string.Format("Bespoke{0}",   
9:        request.Specification.Name);  
10:      MethodInfo info = GetType().GetMethod(targetMethod,  
11:        BindingFlags.NonPublic | BindingFlags.Instance);  
12:      DBC.AssertNotNull(info, string.Format("Method not found - {0}", targetMethod));  
13:      return (IProcessor) info.Invoke(this, null);  
14:      },  
15:      request));  
16:  }  

The RawExecute implementation is shown next:

1:  protected T RawExecute<T>(string baseName, IProcessor handler)   
2:    where T : Stream, new() {  
3:    WebOperationContext.Current.OutgoingResponse.ContentType = JSONContentType;  
4:    T stream = new T();  
5:    AssociationsWrapper wrapper = new AssociationsWrapper(baseName);  
6:    try {  
7:      CheckSessionStatus();  
8:      CheckOrganizationActive();  
9:      wrapper.Associations = handler.Process();  
10:    }  
11:    catch (Exception ex) {  
12:      LogFacade.LogFatal(this, "CAS failed", ex);  
13:      wrapper.Success = false;  
14:    }  
15:    finally {  
16:      wrapper.Write(stream);  
17:    }  
18:  return stream;  
19:  }  

There is a part II to this post - that provides more detail of interest.

Thursday, July 7, 2011

Software quorums

I've been thinking recently of the notion of software quorums and the opportunity that such a concept might present for an open source project (purely on a hobby basis). Sure, there are commercial opportunities for such a software suite, but this will be an 'intellectual' venture if you will - for the sheer joy of creating such a set of collaborating frameworks and their attendant implementations.

I suppose in some ways I can appeal to graph theory to provide a conceptual basis for a design, but that is a little way off (and scribbled on bits of paper and 'badly drawn' Visio diagrams).

So I've settled initially on at least defining some concrete base requirements:

  • Self discovering: multicast discovery of reachable nodes, preferably driven by convention rather than configuration
  • Fail over of 'broken' nodes: core requirement
  • Fail back support: core (requirement)
  • Elections and voting: Nodes may participate in elections, voting a 'prime' node in or out, perhaps based on their performance in terms of correctness
  • Adjudication: Ability of nodes to create a 'committee' to adjudicate on a tie, assuming that the number of nodes is not odd
  • Dead heat support: When dealing with shared resources, deadlock prevention 
  • Graceful removal: Nodes need to be able to retire gracefully (before potentially being re-animated)
  • Split brain detection: An infamous scenario; many possible solutions exist, each with their own advantages and disadvantages
  • Roving agents: A bit of a '90s throwback this; agents to 'move' among nodes, sampling or perhaps providing support where needed
  • Message repository: core
  • System messages versus application messages: Differentiation between system level (quorum administration messages) and higher level, implementation messages
  • Systems operation support: integration into SNMP/SCOM and so on
  • HTML 5 graphical manager: A GUI for quorum management/query/analysis
  • Priority support: Allowing priorities to drive reaction to and treatment of messages (of any type)
  • Implementation: patterns and best practice based

I've also arrived at a rather limp name for the endeavour:

Software quorum, adjudications, tallying and elections


As an acronym therefore: SOQRATES.


Groan.

Wednesday, April 20, 2011

User think time and asychrony

Sometimes the simple idea is the right one to use. We have a complex and highly visible (ASP.NET) web application where I am employed, and I have been the architect and technical lead for this web application since its inception. Now, to use this application, you first have to authenticate (using a standard user name and password arrangement), and will then be challenged with a form of two factor authentication.

Once all this has been negotiated successfully, the web application displays a summary (cf welcome) page containing details of your relationship with the organization. What is displayed is not important in terms of detail, but it is important that there can be a significant latency when constructing the domain objects that are of use. Sure, we employ lazy loading and AJAX judiciously, but there are other options. And significant latency here means anywhere up to 5 or 6 seconds for clients with complex or many relationships - which is a massive time span in Internet terms.

This is where a simple piece of lateral thinking was helpful (I should disclose that Alex Herzog was involved in the original discussions, and was most insightful). I gathered some statistics relating to user think time at the 2FA stage - and was astonished to see an average 'user think' of more than 10 seconds at the 2FA challenge. Now, this does not mean the 2FA needed to be redesigned; it was (still is) accepted as sufficient for our purposes and there was no appetite for any action in this area. But look at that temporal window - its almost double the domain objects construction time for the home page - it seemed almost criminal not to use it.

So the idea formed that this window could be used to 'fork' a one way web service call, that invoked the domain object build machinery to perform necessary object construction. It can't be done sooner than the 2FA challenge, as we need to basically authenticate a user before we devote any server resources to them. Also, executing as a one way service means that we don't have a session to manipulate - the place where our domain objects are normally stored. But we do have a distributed cache available, and by dint of interfaces, are able to have a session that behaves like the real thing but is distributed cache resident.

All that needs to be done to connect all this together for the home page is to include some simple Presenter (the web application is MVP based) logic in the presenter associated with the 'authenticated' master page, to transfer objects from the distributed cache to the (now available) session, and it's done. Of course, there are exceptions and errors to consider, and that is handled, but explaining the nature of that is not that relevant.

It's simple, almost elegant, and allows the welcome page to remain relatively trivial and with an excellent performance profile. Other options were considered, but this approach remains in use today - the 'customer genesis handler'.