Skip to main content

Command Palette

Search for a command to run...

Microsoft Fabric Real-Time Intelligence with Eventstream Custom Endpoints

Updated
Microsoft Fabric Real-Time Intelligence with Eventstream Custom Endpoints

Overview

Navigating the world of real-time data processing can be complex, but Microsoft Fabric's Real-Time Intelligence with Eventstream makes it surprisingly intuitive. This guide walks you through a practical scenario where we'll transform randomized locally generated bogus humidity sensor data using custom endpoints as both input and output sources.

System Architecture

💡
Fabric Eventstream in this case dual acts as endpoints to receive and send data.

The architecture consists of the following stages:

  1. Custom Endpoint as input Data Source: Sensor data is ingested via a local custom endpoint(Generated in the local system via VS Code)

  2. Microsoft Fabric Eventstream: The data flows through transformations to refine and enhance its usability.

  3. Derived Streams as output Data Source: Processed data is sent to another stream and acts as an endpoint to send data.

  4. Eventhouse KQL Database: Data is stored for further analytics and historical tracking.

Prerequisites

  • Fabric Eventstream configured in Microsoft Fabric (Because we need to get the Eventhub Name and Primary Connection String to be included in the producer code)

Step 1: Simulating Humidity Sensor Data

A producer script simulates sensor data and sends it to a custom endpoint. This will be executed in VS Code. As a best practice, the Event Hub name and primary connection strings should be stored securely, such as in Azure Key Vault, and should not be exposed. However, for demonstration purposes, I will explain them as they are.

# INSTALL azure-eventhub before running!
# pip command:
# pip install azure-eventhub
# Primary Stream

from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timezone
import hashlib
import random
import time
import json

# Replace the placeholders with your Event Hubs connection string and event hub name
EVENTHUB_NAME = 'es_d8ecd8c2-45c2-41e8-8f67-2d37e0c8112b'
CONNECTION_STR = 'Endpoint=sb:/'
# Configuration variables
MIN_HUMIDITY = 30.0          # Minimum humidity value
MAX_HUMIDITY = 60.0          # Maximum humidity value
COUNTRY = "Canada"           # Country
CITY = "Toronto"             # City
SLEEP_TIME = 5               # Sleep time before sending next event

# Example message
'''
{
    "country": "Canada",
    "city": "Toronto",
    "timestamp": "2025-01-25T12:36:03.826769+00:00",
    "humidity_readings": [
        {
            "sensor": "sensor_1",
            "humidity": "45.34"
        },
        {
            "sensor": "sensor_2",
            "humidity": "50.21"
        },
        {
            "sensor": "sensor_3",
            "humidity": "55.69"
        }
    ],
    "event_id": "88709af29e138d8d906e009a800ebaddacd46d89d20ec91151e2ab91557170c5"
}
'''

# Create a producer client to send messages to the event hub
producer = EventHubProducerClient.from_connection_string(conn_str=CONNECTION_STR, eventhub_name=EVENTHUB_NAME)

def generate_fake_humidity(min_humidity, max_humidity):
    """Simulate a fake humidity reading within the specified range or generate null."""
    return str(round(random.uniform(min_humidity, max_humidity), 2))

def get_random_sensor_readings(min_humidity, max_humidity):
    """Generate a list of humidity readings for random sensors."""
    sensors = ["sensor_1", "sensor_2", "sensor_3"]
    selected_sensors = random.sample(sensors, random.randint(1, len(sensors)))
    return [{"sensor": sensor, "humidity": generate_fake_humidity(min_humidity, max_humidity)}
            for sensor in selected_sensors]

def generate_event_id(payload):
    """Generate a SHA256 hash as a unique event ID."""
    hash_object = hashlib.sha256(json.dumps(payload, sort_keys=True).encode('utf-8'))
    return hash_object.hexdigest()

def get_current_timestamp():
    """Return the current timestamp in ISO 8601 format."""
    return datetime.now(timezone.utc).isoformat()

try:
    # Continuously generate and send fake humidity readings
    while True:
        # Create a batch.
        event_data_batch = producer.create_batch()

        # Generate random sensor readings
        humidity_readings = get_random_sensor_readings(MIN_HUMIDITY, MAX_HUMIDITY)

        # Create the payload
        payload = {
            "country": COUNTRY,
            "city": CITY,
            "timestamp": get_current_timestamp(),
            "humidity_readings": humidity_readings
        }

        # Generate an event_id
        payload["event_id"] = generate_event_id(payload)

        # Format the message as JSON
        message = json.dumps(payload)

        # Add the JSON-formatted message to the batch
        event_data_batch.add(EventData(message))

        # Send the batch of events to the event hub
        producer.send_batch(event_data_batch)

        print(json.dumps(json.loads(message), indent=4))
        print(event_data_batch)

        # Wait for a bit before sending the next reading
        time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
    print("Stopped by the user")
except Exception as e:
    print(f"Error: {e}")
finally:
    # Close the producer
    producer.close()

Here’s a brief breakdown of the code snippet:

  • It generates random humidity readings between 30% and 60% for three sensors.

  • The data includes location (Canada, Toronto) and a timestamp.

  • The payload is serialized into JSON and assigned a unique event ID using SHA-256 hashing.

  • The script continuously generates and prints this data every 5 seconds, simulating real-time sensor readings.

  • It runs in an infinite loop until manually stopped (KeyboardInterrupt).

  • Event hub name and Connection String-Primary key should be obtained from folloiwng (That’s why we need to first create an Eventstream and then select the source as CustomEndpoint )

Following are the payload output in VS Code

Step 2: Transforming Data in Microsoft Fabric Eventstream

In these steps, i will explain different transformations done to the generated Eventstream output.

2.1 Expanding Humidity Readings

This transformation expands the humidity_readings array into individual events using Expand transformation function,what the Expand function does is ‘Create a new row for each value within an array’ like below.

{
    "sensor": "sensor_1",
    "humidity": 45.5,
    "country": "Canada",
    "timestamp": "2025-02-07T12:00:00Z",
    "event_id": "<unique_hash>"
}

2.2 Managed Field Transformations

Renaming column names and data type changes are done in this step

{
    "sensor": "sensor_1",
    "humidity": 45.5,
    "country": "Canada",
    "event_id": "<unique_hash>"
}

2.3 Group by Transformation

We will add a new column based on the average humidity for each given Event_id. To achieve this, first, perform a group-by operation and then Left join the result with the original table stream.

2.3 Left Join and Managed Fields Transformations

(New AVG_humidity column has been added)

These Processed data are then routed to:

  1. A derived stream /secondary stream which acts as an input for another custom endpoint.

  2. An Eventhouse/KQL DB (Will not explain the usage of this as the use case in this post is custom endpoints)

Step 3: Consuming derived stream /secondary stream via custom endpoint

The following code block, when executed in VS Code, retrieves grouped and transformed data which pushes through the secondary stream. Similarly, a new EventHub name and connection string need to be added.

# Secondary Stream
#  INSTALL azure-eventhub before running!
# pip install azure-eventhub

from azure.eventhub import EventHubConsumerClient
import json

# Replace the placeholders with your Event Hubs connection string and event hub name
EVENTHUB_NAME = 'des_cee26d55-449b-4c4e-a0dc-ed481eaed80c'
CONNECTION_STR = 'Endpoint=sb://.windows.net/;'
CONSUMER_GROUP = '$Default'  # Change if using a different consumer group

def on_event(partition_context, event):
    """Callback function to process received events."""
    try:
        # Decode event data
        event_data = json.loads(event.body_as_str())

        # Print received event
        print(json.dumps(event_data, indent=4))

        # Update checkpoint
        partition_context.update_checkpoint(event)
    except Exception as e:
        print(f"Error processing event: {e}")

# Create a consumer client
consumer = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group=CONSUMER_GROUP,
    eventhub_name=EVENTHUB_NAME
)

try:
    print("Listening for events...")
    with consumer:
        consumer.receive(
            on_event=on_event,
            starting_position="-1"  # Read from the beginning
        )
except KeyboardInterrupt:
    print("Stopped by user.")
except Exception as e:
    print(f"Error: {e}")
finally:
    consumer.close()

Conclusion

Adding custom endpoints as inputs and outputs in Microsoft Fabric Eventstream provides greater flexibility for real-time data ingestion and distribution, making it easier to integrate with various systems and applications.

As an Input Source

  • Bring in real-time data from external systems, IoT devices, or third-party applications.

  • Connect to proprietary or industry-specific data sources that aren’t natively supported.

  • Ingest data from APIs, webhooks, or other event-driven sources.

As an Output Destination

  • Route transformed event data to external applications, analytics platforms, or databases.

  • Send data to business systems, dashboards, or machine learning models for further processing.

  • Integrate with third-party APIs or messaging services for real-time automation.

By using custom endpoints, you can seamlessly extend Eventstream’s capabilities to fit your specific needs, ensuring your data flows where it’s needed in real-time.

Thanks for Reading !!!

More from this blog

B

BI Diaries - Nālaka Wanniarachchi

40 posts

This blog delivers insights and tutorials around Microsoft Fabric, Power BI, Azure,Databricks,Data Engineering,Data Analytics with actionable strategies for Business Intelligence(BI) professionals.