Data Store Connectors

Data store connectors can only be configured after you have set up one or more service bindings:

Cloudant Connector

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

serviceBinding = {
    "name": "test-cloudant",
    "description": "Test Cloudant instance",
    "type": "cloudant",
    "credentials": {
        "host": "hostname",
        "port": 443,
        "username": "username",
        "password": "password"
    }
}

cloudantService = appClient.serviceBindings.create(serviceBinding)

# Create the connector
connector = self.appClient.dsc.create(
    name="connector1", type="cloudant", serviceId=cloudantService.id, timezone="UTC",
    description="A test connector", enabled=True
)

# Create a destination under the connector
destination1 = connector.destinations.create(name="all-data", bucketInterval="DAY")

# Create a rule under the connector, that routes all events to the destination
rule1 = connector.rules.createEventRule(
    name="allevents", destinationName=destination1.name, typeId="*", eventId="*",
    description="Send all events", enabled=True
)
# Create a second rule under the connector, that routes all state to the same destination
rule2 = connector.rules.createStateRule(
    name="allstate", destinationName=destination1.name, logicalInterfaceId="*",
    description="Send all state", enabled=True,
)

Event Streams Connector

import wiotp.sdk.application
from wiotp.sdk.api.services import EventStreamsServiceBindingCredentials, EventStreamsServiceBindingCreateRequest

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

serviceBinding = {
    "name": "test-eventstreams",
    "description": "Test EventStreams instance",
    "type": "eventstreams",
    "credentials": {
        "api_key": "EVENTSTREAMS_API_KEY",
        "user": "EVENTSTREAMS_USER",
        "password": "EVENTSTREAMS_PASSWORD",
        "kafka_admin_url": "EVENTSTREAMS_ADMIN_URL",
        "kafka_brokers_sasl": [
            "EVENTSTREAMS_BROKER1",
            "EVENTSTREAMS_BROKER2",
        ],
    },
}

eventStreamsService = appClient.serviceBindings.create(serviceBinding)

# Create the connector
connector = self.appClient.dsc.create(
    name="connectorES", type="eventstreams", serviceId=eventStreamsService.id, timezone="UTC",
    description="A test event streams connector", enabled=True
)

# Create a destination under the connector
destination1 = connector.destinations.create(name="all-data", partitions=3)

# Create a rule under the connector, that routes all events to the destination
rule1 = connector.rules.createEventRule(
    name="allevents", destinationName=destination1.name, typeId="*", eventId="*",
    description="Send all events", enabled=True
)
# Create a second rule under the connector, that routes all state to the same destination
rule2 = connector.rules.createStateRule(
    name="allstate", destinationName=destination1.name, logicalInterfaceId="*",
    description="Send all state", enabled=True
)

DB2 Connector

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

credentials = {
  "hostname": "DB2_HOST",
  "port": "DB2_PORT",
  "username": "DB2_USERNAME",
  "password": "DB2_PASSWORD",
  "https_url": "DB2_HTTPS_URL",
  "ssldsn": "DB2_SSL_DSN",
  "host": "DB2_HOST",
  "uri": "DB2_URI",
  "db": "DB2_DB",
  "ssljdbcurl": "DB2_SSLJDCURL",
  "jdbcurl": "DB2_JDBCURL"
}

serviceBinding = {
    "name": "test-db2",
    "description": "Test DB2 instance",
    "type": "db2",
    "credentials": credentials
}

db2Service = appClient.serviceBindings.create(serviceBinding)

# Create the connector
connector = self.appClient.dsc.create(
    name="connectorDB2",
    type="db2",
    serviceId=db2Service.id,
    timezone="UTC",
    description="A test connector",
    enabled=True
)

# Create a destination under the connector
columns = [
    {"name": "TEMPERATURE_C", "type": "REAL", "nullable": False},
    {"name": "HUMIDITY", "type": "INTEGER", "nullable": True},
    {"name": "TIMESTAMP", "type": "TIMESTAMP", "nullable": False},
]

destination1 = connector.destinations.create(name="test_destination_db2", columns=columns)

# Create a rule under the connector, that routes all state to the same destination
# We can only forward state to a db2 connector, not the raw events
ruleConfiguration={
    "columnMappings": {
        "TEMPERATURE_C": "$event.state.temp.C",
        "HUMIDITY": "$event.state.humidity",
        "TIMESTAMP": "$event.timestamp"
    }
}

rule = connector.rules.createStateRule(
    name="Environment State Forwarding Rule",
    destinationName=destination1.name,
    logicalInterfaceId="123456789012345678901234",
    description="Write environment state to target table",
    enabled=True,
    configuration=ruleConfiguration
)

Postgres Connector

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

credentials = {
  "hostname": "POSTGRES_HOSTNAME",
  "port": "POSTGRES_PORT",
  "username": "POSTGRES_USERNAME",
  "password": "POSTGRES_PASSWORD",
  "certificate": "POSTGRES_CERTIFICATE",
  "database": "POSTGRES_DATABASE"
}

serviceBinding = {
    "name": "test-postgres",
    "description": "Test Postgres instance",
    "type": "postgres",
    "credentials": credentials
}

postgresService = appClient.serviceBindings.create(serviceBinding)

# Create the connector
connector = self.appClient.dsc.create(
    name="connectorPostgres",
    type="postgres",
    serviceId=postgresService.id,
    timezone="UTC",
    description="A test connector",
    enabled=True
)

# Create a destination under the connector
columns = [
    {"name": "TEMPERATURE_C", "type": "REAL", "nullable": False},
    {"name": "HUMIDITY", "type": "INTEGER", "nullable": True},
    {"name": "TIMESTAMP", "type": "TIMESTAMP", "nullable": False},
]

destination1 = connector.destinations.create(name="test_destination_postgres", columns=columns)

# Create a rule under the connector, that routes all state to the same destination
# We can only forward state to a postgres connector, not the raw events
ruleConfiguration={
    "columnMappings": {
        "TEMPERATURE_C": "$event.state.temp.C",
        "HUMIDITY": "$event.state.humidity",
        "TIMESTAMP": "$event.timestamp"
    }
}

rule = connector.rules.createStateRule(
    name="Environment State Forwarding Rule",
    destinationName=destination1.name,
    logicalInterfaceId="123456789012345678901234",
    description="Write environment state to target table",
    enabled=True,
    configuration=ruleConfiguration
)

Connectors

Tip

You can have multiple connectors (up to 10, depending on your service plan) of mixed types (Cloudant or EventStreams). This means 10 combined, not 10 of each.

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

# Create the connector
serviceId = "xxx"
connector = appClient.dsc.create(
    name="test-connector-cloudant", serviceId=serviceId, timezone="UTC", description="A test connector", enabled=True
)

print(" - " + connector.name)
print(" - " + connector.connectorType)
print(" - Enabled: " + connector.enabled)

Destinations

Each connector can have multiple destinations defined (up to 100 depending on your service plan)

Tip

Destinations are immutable. If you want to change where you send events to:

  • Create a new destination
  • Update the forwarding rule to reference the new destination
  • Delete the old destination
import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

connectorId = "xxx"
connector = appClient.dsc[connectorId]

# Create a destination under the connector
destination1 = connector.destinations.create(name=destinationName, bucketInterval="DAY")

Forwarding Rules

Forwarding rules configure what kind of data and the scope of the data that is sent to a destination. Each connector can have multiple forwarding rules defined (up to 100 depending on your service plan)

Tip

Each forwarding rule can only route to a single destination, but multiple rules can reference the same destination

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

# Note: This code assumes a destination named "all-data" has already been created under this connector
connectorId = "xxx"
connector = appClient.dsc[connectorId]

# Create a rule under the connector, that routes all events to the destination
rule1 = connector.rules.createEventRule(
    name="allevents",
    destinationName="all-data",
    typeId="*",
    eventId="*",
    description="Send all events",
    enabled=True
)

# Create a second rule under the connector, that routes all state to the same destination
rule2 = createdConnector.rules.createStateRule(
    name="allstate",
    destinationName="all-data",
    logicalInterfaceId="*",
    description="Send all state",
    enabled=True,
)