This guide will walk you through integrating with Bridgewise's SignalWise, our near real-time alert engine. Think of SignalWise as a constant stream of market events. Your job is to subscribe to this stream, listen for the events relevant to you, and use them in your application.
We use Apache Kafka to publish these event streams. The process is straightforward:
Get schema for desired topic by using schema registry client (provided by Confluent Kafka package), (Note: credentials will be provided by Bridgewise customer support).
Subscribe to one or more "topics" (e.g., stock alerts, crypto alerts), (Note: The schema of the topic should be provided to the consumer as configuration while subscribing to the topic).
Receive the incoming alert messages, which are structured as simple JSON objects.
Step 1: Setting Up Your Kafka Connection
To get started, you'll need to configure a Kafka consumer. A "consumer" is just a piece of your application that listens for messages from our Kafka stream.
Bridgewise customer support will provide you with the following credentials:
Kafka Hostname (KAFKA_HOSTNAME)
Kafka Username (KAFKA_USERNAME)
Kafka Password (KAFKA_PASSWORD)
Schema Registry URL (schema_registry_url)
Schema Registry Key/Secret (basic.auth.user.info)
We've provided links to official documentation, but the Python example below is the most practical starting point.
Step 2: Subscribing to Alert Topics
We categorize alerts into different topics based on asset class. You can subscribe to one or more of these depending on your needs.
Step 3: Understanding the Alert Message (JSON Schema)
Every message you receive from a topic will be a JSON object. Here’s a breakdown of a sample alert for a stock, with explanations for the key fields.
Sample JSON Message:
{
"identifier": "24893",
"companyId": 24893,
"tradingItemId": 2590144,
"eventAmount": 1,
"eventAttributes": {
"assetClass": "Stock",
"assetThresholdPrice": "0.0",
"category": "Price Event",
"assetChangePercent": -6.3,
"language": "en-US",
"listOfSymbols": [ "ANSS" ],
"eventSentiment": "Negative",
"eventImportanceLevel": 2,
"subject_en-US": "ANSYS is dropping 6.3% today! Go see ANSS for more details.",
"subject_ja-JP": "ANSSが本日6.1%下落中!詳細はANSSをチェック!.",
"subject_es-ES": "¡ANSYS está bajando 6.3% hoy! Consulta ANSS para más detalles.",
"title_en-US": "Sharp Move Down",
"title_ja-JP": "急激な下落",
"triggerPrice": 326.31
},
"eventDate": "2025-05-28 19:40:05.989797 UTC",
"eventStatus": "New",
"eventType": "AggressiveMoveIdentificationAlertLow",
"isPublicEvent": true,
"uniqueEventId": "6f85f394-bae8-4ead-a18a-3e3cfbcffc59"
}
Key Fields Explained:
uniqueEventId: Use this to uniquely identify each alert and prevent processing duplicates.
identifier, companyId, tradingItemId: Internal Bridgewise identifiers for the asset.
eventDate: The UTC timestamp of when the event occurred.
eventAttributes: An object containing the core details of the alert.
assetClass: The type of asset (e.g., "Stock", "Crypto").
assetChangePercent: The percentage change that triggered the alert.
eventSentiment: "Positive" or "Negative".
subject_{language}: The full alert text, localized. For example, subject_en-US is the English version, and subject_ja-JP is the Japanese version.
title_{language}: A short, localized title for the alert.
Step 4: Example Consumer Code (Python)
Here is a practical Python example to get you started. It uses the confluent-kafka library.
Note: This code is more robust than the original documentation. It properly handles deserialization and potential errors.
# You'll need to install the library first:
# pip install confluent-kafka
from confluent_kafka import DeserializingConsumer, KafkaException, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
# --- 1. Configuration: Fill in your credentials here ---
# Your Bridgewise customer support contact will provide all the following values.
KAFKA_HOSTNAME = "<your_kafka_host_name>"
KAFKA_USERNAME = "<your_kafka_username>"
KAFKA_PASSWORD = "<your_kafka_password>"
SCHEMA_REGISTRY_URL = "<your_schema_registry_url>"
SCHEMA_REGISTRY_KEY = "<your_schema_registry_key>"
SCHEMA_REGISTRY_SECRET = "<your_schema_registry_secret>"
# The topic you want to subscribe to
# For stocks, use: "daily-insights-bw-external"
TOPIC_NAME = "daily-insights-bw-external"
# --- 2. Set up Schema Registry and Deserializer ---
# The schema registry helps the consumer understand the structure of the JSON messages.
schema_registry_client = SchemaRegistryClient({
"url": SCHEMA_REGISTRY_URL,
"basic.auth.user.info": f"{SCHEMA_REGISTRY_KEY}:{SCHEMA_REGISTRY_SECRET}"
})
# Get the latest schema for our topic
subject_name = TOPIC_NAME + "-value"
latest_schema = schema_registry_client.get_latest_version(subject_name)
# This function helps decode the message from Kafka into a Python dictionary
json_deserializer = JSONDeserializer(latest_schema.schema.schema_str, from_dict=lambda x, ctx: x)
# --- 3. Kafka Consumer Configuration ---
# This dictionary holds all the connection settings.
consumer_conf = {
"bootstrap.servers": KAFKA_HOSTNAME,
"sasl.username": KAFKA_USERNAME,
"sasl.password": KAFKA_PASSWORD,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"group.id": "my_application_consumer_group_1", # A unique name for your consumer group
"auto.offset.reset": "latest", # Start reading from the newest messages
"value.deserializer": json_deserializer # Use the deserializer we configured
}
# --- 4. Main Application Logic ---
# Create and subscribe the consumer
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe([TOPIC_NAME])
print(f"Subscribed to '{TOPIC_NAME}'. Waiting for messages...")
try:
while True:
# Poll for new messages every second
msg = consumer.poll(timeout=1.0)
if msg is None:
# No message received within the timeout
continue
if msg.error():
# Handle potential errors
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event, not an error.
continue
else:
raise KafkaException(msg.error())
# If we get here, we have a valid message!
# The `msg.value()` is already a Python dictionary because of the deserializer.
alert_data = msg.value()
# Now you can use the data in your application
print(f"Received Alert! ID: {alert_data.get('uniqueEventId')}")
print(f" -> Title (EN): {alert_data['eventAttributes'].get('title_en-US')}")
print(f" -> Subject (EN): {alert_data['eventAttributes'].get('subject_en-US')}\n")
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
# Cleanly close the connection
consumer.close()
Was this article helpful?
That’s Great!
Thank you for your feedback
Sorry! We couldn't be helpful
Thank you for your feedback
Feedback sent
We appreciate your effort and will try to fix the article