CAP theorem – the challenge
Imagine we have thousands of thousands of alerts (telemetry data) generated from IoT devices and we want to collect them to do some analysis, reporting or to create supporting tickets from the data we received? These alerts are sent over a network protocol, either through HTTPS, SMTP, TCP/IP or AMQP. The truth is that there will always be a latency receiving those messages. If you heard about the CAP theorem, it states that it is impossible for a distributed data store to achieve more than 2 out of these 3 guarantees:
- Consistency: Every read receives the most recent write or an error
- Availability: Every request receives a (non-error) response, without the guarantee that it contains the most recent write
- Partition tolerance: The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes
You can think of distributed data store as a destination endpoint such as Azure Event Hubs, Functions or Service Bus queues/topics. No matter what distributed data store we are using, there’ll be one (out of the 3) missing but using a combination of different platforms and technologies, we can achieve these. Let’s have a look what we can use in Azure. For a full list of Azure services and where to use them, please refer to my other post on this topic.
Event Hubs
For the IoT alerts scenario, we are going to face an issue of consistency. If you used Kafka before, you probably realize that this also has been an issue. Messages received out of order, delayed or even lost as they were sent over a network. This may have more to do with the message delivery itself, rather than the component in which it arrives. However, when this occurs, the whole system is just unreliable. Using Event Hubs, we can assure that messages arrive sequentially but only within a single partition. We have no way to tell whether they arrive in order across all partitions. If we only use one partition in an event hub, this issue can then be solved but then the point of using it is useless. Event Hubs is designed to scale thousands of ingress/egress messages in milliseconds across partitions and nodes (called throughput units).
Service Bus queues/topics
Sending messages to a single-partition event hub or a service bus queue/topic can solve the ordering case but imagine sending thousands of messages into a Service Bus queue/topic, the problem raises when it comes to availability. Messages arrive in a queue, I meant the waiting line, the line you may see while checking in your luggage at the airport. One message arrives after the other. What if many different devices generate alerts at the same time (and more likely they will)?
Azure functions
Using Azure functions as a data store can solve the availability issue because multiple instances of a function can process messages at the same time but then the consistency issue still occurs. Delayed arrival of messages can result in setting the wrong state for the device at destinations. An example of this is when a device generates an alert at 3 AM with a state = failed, and 1 minute later it generates another one with a state = working. If these messages arrived in order then we wouldn’t have to worry about it but if somehow they were out of order (because of a network failure or delay), say the message gets generated at 3.01 AM but arrived before the one at 3 AM then we would set the state of this device incorrectly. To overcome any of those issues, use the above in a combination with Azure Stream Analytics.
Azure Stream Analytics
I won’t discuss what Azure Stream Analytics is in detail because it’s documented so clearly in the Microsoft docs. I will, however, tell you how I’m going to use it to achieve the CAP theorem I mentioned at the beginning.
What we have achieved so far:
- Availability – event hubs and functions
- Partition tolerance – event hubs
So we need the consistency bit and we use Azure Stream Analytics just for that purpose.
Basically, we have an input of multiple data sources (ingesting messages), an analyzer (this will be the place we do our filtering, sorting and transforming messages such as from XML to JSON format) and an output for delivering messages to our destination endpoints (eg. power BI, data warehouse, hubs, queues, etc).
We expect our IoT alert messages to arrive late or out of order. Here is the diagram to our solution:
Ingesting
Alerts are in XML format and get delivered to our Azure function. We don’t need to have many functions to handle the transformation. Its main job is to take the message and convert into the accepted JSON format and forward it to the event hub. A function can scale to many instances so one function is enough in this case.
You could have your IoT devices sending messages straight to input hub but in my case, the IoT devices can only send post requests over HTTPS. There is no way to set the request headers like authorization header that is required to interact with the event hub. To overcome this problem, I have to use a Http Trigger Azure function. IoT devices can then send post requests to this function using a function key. This key can be specified in the post url.
public static class MessagesDispatcher
{
[FunctionName("MessagesDispatcher")]
public static async Task Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[EventHub("input-messages", Connection = "")]IAsyncCollector outputEvents,
ILogger log)
{
log.LogInformation("MessagesDispatcher function executed at {Date}", DateTimeOffset.Now);
using var streamReader = new StreamReader(req.Body);
string requestBody = await streamReader.ReadToEndAsync();
if (!string.IsNullOrEmpty(requestBody))
{
try
{
var xmlSerializer = new XmlSerializer(typeof(notification));
using var stringReader = new StringReader(requestBody);
var data = (notification)xmlSerializer.Deserialize(stringReader);
var json = JsonConvert.SerializeObject(data);
await outputEvents.AddAsync(json);
return new OkObjectResult(json);
}
catch (Exception ex)
{
return new BadRequestObjectResult(ex.Message);
}
}
return new BadRequestObjectResult(requestBody);
}
}
If you have control over how messages can be delivered, I suggest sending them straight to the event or IoT hub.
The role of the input hub is to provide a stream input to our analyzer. I’m having a single event hub namespace (with 1 throughput unit) which has 2 hubs in it – input and output as you can tell. Each has 32 partitions, 2 consumer groups – development, production. This number of partitions gives messages plenty of space to arrive in parallel. We are going to use production messages and consume them using the development group without interfering the “live” version. This allows us to have both environments running at the same time.
Analyzing
The main tasks for the analyzer are to filter, sort and group messages. We write T-SQL-like syntax in Stream Analytics.
SELECT
DeviceID,
AffectedService,
CollectTop(1000) OVER (ORDER BY CAST(TimeOfStateChange AS datetime) ASC) as top1000
INTO
[develop-output]
FROM
[develop-input]
GROUP BY
DeviceID,
AffectedService,
TumblingWindow(second, 5)
Here we are collecting 1000 events (could be less) every 5 seconds using Tumbling window function, and order by the time the state was changed. We have covered the issue of delayed messages because we are collecting every 5 seconds. We could increase this number if messages take too long to hit our function or if the function takes longer to convert the message from XML to JSON (eg. due to the payload reason). This give us a json per DeviceID, sorted by time for all 1000 events which we can then use to process.
To understand window functions, refer this doc for more information.
Delivering
Once messages are clean and sorted, we can then send them to the event hub output. Remember we have 32 partitions and messages are grouped by DeviceID, sorted by date, and distributed cross all 32 partitions. This way we can have multiple endpoints (could be multiple instances of azure functions or actors) listening to same hub but only pick up the messages they want. If the number of AffectedService <= number of partitions, we can make this field as a partition key and have our clients listening to a specific partition using this key. This is how in-parallel processing can be done but it’s not the only way.
We don’t need partition key to scale our functions.
Let’s say want to use many instances of a function to process those 1000 events, we could do an estimation based on that 5 seconds timestamp. The goal here is to process messages in parallel relatively to the Device ID.
Devices may not generate 1000 alerts every 5 seconds so we may reduce that number down to, say 100 events/5s. When this number exceeds the capacity of a function can handle, it will then automatically pump up the number of new instances to handle the remaining.
Final thoughts
Although this post mostly addressed the approach we take for our IoT scenarios in Azure but the overall goal still remains in any distributed environment. The CAP theorem still hold its values and we should consider carefully when building our integration systems, especially in the world of distributed computing.
Messaging has been a great solution to a distributed environment as described by Gregor Hohpe and Bobby Woolf in their book – Enterprise Integration Patterns but dealing with it has never been easy.