SignalWise Integration Guide: Connecting to Near Real-Time Alert Stream

Created by Erez Michaeli, Modified on Wed, 25 Jun at 3:31 AM by Erez Michaeli

This guide will walk you through integrating with Bridgewise's SignalWise, our near real-time alert engine. Think of SignalWise as a constant stream of market events. Your job is to subscribe to this stream, listen for the events relevant to you, and use them in your application.

We use Apache Kafka to publish these event streams. The process is straightforward:

  1. Get schema for desired topic by using schema registry client (provided by Confluent Kafka package), (Note: credentials will be provided by Bridgewise customer support).

  2. Subscribe to one or more "topics" (e.g., stock alerts, crypto alerts), (Note: The schema of the topic should be provided to the consumer as configuration while subscribing to the topic).

  3. Receive the incoming alert messages, which are structured as simple JSON objects.


Step 1: Setting Up Your Kafka Connection

To get started, you'll need to configure a Kafka consumer. A "consumer" is just a piece of your application that listens for messages from our Kafka stream.

Bridgewise customer support will provide you with the following credentials:

  • Kafka Hostname (KAFKA_HOSTNAME)

  • Kafka Username (KAFKA_USERNAME)

  • Kafka Password (KAFKA_PASSWORD)

  • Schema Registry URL (schema_registry_url)

  • Schema Registry Key/Secret (basic.auth.user.info)

We've provided links to official documentation, but the Python example below is the most practical starting point.

  • Confluent Kafka Clients: Link

  • Using Kafka in AWS Lambda: Link



Step 2: Subscribing to Alert Topics

We categorize alerts into different topics based on asset class. You can subscribe to one or more of these depending on your needs.

Asset Class

Topic Name

Stocks +Etfs

daily-insights-bw-external

Commodities

daily-insights-bw-external-commodity

Crypto

daily-insights-bw-external-crypto

Forex

daily-insights-bw-external-forex





Step 3: Understanding the Alert Message (JSON Schema)

Every message you receive from a topic will be a JSON object. Here’s a breakdown of a sample alert for a stock, with explanations for the key fields.

Sample JSON Message:

 {

  "identifier": "24893",

  "companyId": 24893,

  "tradingItemId": 2590144,

  "eventAmount": 1,

  "eventAttributes": {

    "assetClass": "Stock",

    "assetThresholdPrice": "0.0",

    "category": "Price Event",

    "assetChangePercent": -6.3,

    "language": "en-US",

    "listOfSymbols": [ "ANSS" ],

    "eventSentiment": "Negative",

    "eventImportanceLevel": 2,

    "subject_en-US": "ANSYS is dropping 6.3% today! Go see ANSS for more details.",

    "subject_ja-JP": "ANSSが本日6.1%下落中!詳細はANSSをチェック!.",

    "subject_es-ES": "¡ANSYS está bajando 6.3% hoy! Consulta ANSS para más detalles.",

    "title_en-US": "Sharp Move Down",

    "title_ja-JP": "急激な下落",

    "triggerPrice": 326.31

  },

  "eventDate": "2025-05-28 19:40:05.989797 UTC",

  "eventStatus": "New",

  "eventType": "AggressiveMoveIdentificationAlertLow",

  "isPublicEvent": true,

  "uniqueEventId": "6f85f394-bae8-4ead-a18a-3e3cfbcffc59"

}


Key Fields Explained:

  • uniqueEventId: Use this to uniquely identify each alert and prevent processing duplicates.

  • identifiercompanyIdtradingItemId: Internal Bridgewise identifiers for the asset.

  • eventDate: The UTC timestamp of when the event occurred.

  • eventAttributes: An object containing the core details of the alert.

    • assetClass: The type of asset (e.g., "Stock", "Crypto").

    • assetChangePercent: The percentage change that triggered the alert.

    • eventSentiment: "Positive" or "Negative".

    • subject_{language}: The full alert text, localized. For example, subject_en-US is the English version, and subject_ja-JP is the Japanese version.

    • title_{language}: A short, localized title for the alert.

Step 4: Example Consumer Code (Python)

Here is a practical Python example to get you started. It uses the confluent-kafka library.

Note: This code is more robust than the original documentation. It properly handles deserialization and potential errors.

# You'll need to install the library first:

# pip install confluent-kafka


from confluent_kafka import DeserializingConsumer, KafkaException, KafkaError

from confluent_kafka.schema_registry import SchemaRegistryClient

from confluent_kafka.schema_registry.json_schema import JSONDeserializer


# --- 1. Configuration: Fill in your credentials here ---

# Your Bridgewise customer support contact will provide all the following values.

KAFKA_HOSTNAME = "<your_kafka_host_name>"

KAFKA_USERNAME = "<your_kafka_username>"

KAFKA_PASSWORD = "<your_kafka_password>"

SCHEMA_REGISTRY_URL = "<your_schema_registry_url>"

SCHEMA_REGISTRY_KEY = "<your_schema_registry_key>"

SCHEMA_REGISTRY_SECRET = "<your_schema_registry_secret>"


# The topic you want to subscribe to

# For stocks, use: "daily-insights-bw-external"

TOPIC_NAME = "daily-insights-bw-external" 


# --- 2. Set up Schema Registry and Deserializer ---

# The schema registry helps the consumer understand the structure of the JSON messages.

schema_registry_client = SchemaRegistryClient({

    "url": SCHEMA_REGISTRY_URL,

    "basic.auth.user.info": f"{SCHEMA_REGISTRY_KEY}:{SCHEMA_REGISTRY_SECRET}"

})


# Get the latest schema for our topic

subject_name = TOPIC_NAME + "-value"

latest_schema = schema_registry_client.get_latest_version(subject_name)


# This function helps decode the message from Kafka into a Python dictionary

json_deserializer = JSONDeserializer(latest_schema.schema.schema_str, from_dict=lambda x, ctx: x)



# --- 3. Kafka Consumer Configuration ---

# This dictionary holds all the connection settings.

consumer_conf = {

    "bootstrap.servers": KAFKA_HOSTNAME,

    "sasl.username": KAFKA_USERNAME,

    "sasl.password": KAFKA_PASSWORD,

    "security.protocol": "SASL_SSL",

    "sasl.mechanisms": "PLAIN",

    "group.id": "my_application_consumer_group_1", # A unique name for your consumer group

    "auto.offset.reset": "latest", # Start reading from the newest messages

    "value.deserializer": json_deserializer # Use the deserializer we configured

}


# --- 4. Main Application Logic ---

# Create and subscribe the consumer

consumer = DeserializingConsumer(consumer_conf)

consumer.subscribe([TOPIC_NAME])


print(f"Subscribed to '{TOPIC_NAME}'. Waiting for messages...")


try:

    while True:

        # Poll for new messages every second

        msg = consumer.poll(timeout=1.0)


        if msg is None:

            # No message received within the timeout

            continue

        

        if msg.error():

            # Handle potential errors

            if msg.error().code() == KafkaError._PARTITION_EOF:

                # End of partition event, not an error.

                continue

            else:

                raise KafkaException(msg.error())

        

        # If we get here, we have a valid message!

        # The `msg.value()` is already a Python dictionary because of the deserializer.

        alert_data = msg.value()

        

        # Now you can use the data in your application

        print(f"Received Alert! ID: {alert_data.get('uniqueEventId')}")

        print(f"  -> Title (EN): {alert_data['eventAttributes'].get('title_en-US')}")

        print(f"  -> Subject (EN): {alert_data['eventAttributes'].get('subject_en-US')}\n")


except KeyboardInterrupt:

    print("Stopping consumer...")

finally:

    # Cleanly close the connection

    consumer.close()





Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons
CAPTCHA verification is required.

Feedback sent

We appreciate your effort and will try to fix the article