To be able to send events to the EventHub, we need an EventHub Client as part of our Infrastructure layer. This implementation is part of an EventProducer used by an Azure Function. I’m using a .Net 6 Class Library Project to create the Client named MyService.EventHubClient.
Under the Domain Layer, add an Interface to be implemented by the EventHubService.
public interface IHrEventHubService
{
Task SendToEventHub(EventDataWrapper eventDataWrapper);
}
The EventDataWrapper contains the required serialized data to be shared between different services in a Microservice architecture. The data can be serialized/deserialized using Newtonsoft.json approach or Avro Serializer with Schema Registry. The EventDataWrapper is exposing the required properties over the domain models and sending the data to the Event Hub as the Producer.
Back to the EventHubClient Project, create EventHubClientBuilder as below:
public static class EventHubClientBuilder
{
public static EventHubProducerClient Build(string connectionString, string eventHubName)
{
return new EventHubProducerClient(connectionString, eventHubName);
}
}
The above class is like a Factory to return the EventHubProducerClient object and will be created on Startup.
Now, create the EventHubService that will implement the Interface created in the domain layer above:
public class EventHubService : IHrEventHubService
{
private readonly EventHubProducerClient _eventHubProducerClient;
public EventHubService(EventHubProducerClient eventHubProducerClient)
{
this._eventHubProducerClient = eventHubProducerClient;
}
public async Task SendToEventHub(EventDataWrapper eventDataWrapper)
{
using var eventBatch = await _eventHubProducerClient.CreateBatchAsync();
EventData messageEventData;
if (eventDataWrapper.SerializerType == SerializationType.Avro)
{
messageEventData = eventDataWrapper.AvroSerializedEventData;
messageEventData?.Properties.Add("ContentType", messageEventData.ContentType);
messageEventData?.Properties.Add("SerializationType", "Avro");
}
else
{
messageEventData = new EventData(Encoding.UTF8.GetBytes(eventDataWrapper.Message));
messageEventData.Properties.Add("SerializationType", "Other");
}
messageEventData?.Properties.Add("EventType", eventDataWrapper.EventType);
if (!eventBatch.TryAdd(messageEventData))
{
throw new Exception($"Event is too large for the batch and cannot be sent.");
}
await _eventHubProducerClient.SendAsync(eventBatch);
}
}
Use the default DI in the Startup.cs class of an Azure Function to get the EventHubService object as below:
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
//...
builder.Services.AddSingleton(factory =>
{
var eventHubClient = EventHubClientBuilder.Build(appConfiguration.EventProducerEventHubConnectionString,
appConfiguration.EventProducerEventHub);
IHrEventHubService hrEventHubClient = new EventHubService(eventHubClient);
return hrEventHubClient;
});
//...
}
}
This is just an example of creating the EventHubProducerClient, however you can change the serialization and EventDataWrapper implementation as required.