Use TigerGraph as a (Spark) Data source

Hi team,

I see that JDBC connector for TigerGraph is deprecated. I always felt that TG is making it tough to act a data source. Most places/article show we can use (Py)Spark/Kafka to load data into TigerGraph, but not the other way around. I have millions of rows of data returned by a TG query, I want to ingest into a Kafka Topic. How can I do that?

“Most places/article show we can use (Py)Spark/Kafka to load data into TigerGraph, but not the other way around.”

@interesting_ideas I’ll bring this feedback to the Product Team. In most cases you can connect TigerGraph’s REST endpoint data egress to other tools/products.

Unfortunately, Apache Kafka itself does not natively support ingestion directly from REST endpoints because Kafka is fundamentally a distributed streaming platform that operates with its own native protocols for data transmission. However, you can facilitate data ingestion from REST endpoints into Kafka topics through a few different approaches using auxiliary tools and services. Here are some popular methods:

1. Kafka Connect with REST APIs

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems using source and sink connectors. It can be used to ingest data from REST APIs into Kafka using a HTTP Source Connector. The steps include:

  • Setting up Kafka Connect: Install and configure Kafka Connect on your Kafka cluster.
  • Configure HTTP Source Connector: Use an existing connector like the Confluent HTTP Source Connector or other community connectors capable of calling REST APIs at scheduled intervals and writing the responses to a Kafka topic.

Example configuration snippet for a Kafka Connect HTTP Source Connector:

propertiesCopy code

name=http-source-connector
connector.class=io.confluent.connect.http.HttpSourceConnector
tasks.max=1
http.api.url=https://api.example.com/data
kafka.topic=ingest-topic
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1

This configuration defines a source connector that fetches data from https://api.example.com/data and publishes it to the ingest-topic Kafka topic.

2. Kafka REST Proxy

As previously mentioned, the Confluent REST Proxy provides a RESTful interface to Kafka. It allows producers and consumers to publish and retrieve Kafka data using HTTP/HTTPS protocols. This method is typically used for scenarios where the clients cannot use the native Kafka protocol.

To ingest data using the Kafka REST Proxy, you would need a separate process that polls or listens to your REST endpoints and then sends the fetched data to Kafka through the REST Proxy.

3. Custom Producer Application

Develop a custom application that acts as a middleware between your REST endpoints and Kafka. (see below TigerGraph -(Python)-> Kafka) This application would:

  • Poll the REST API periodically or react to webhooks.
  • Fetch the data and serialize it into a suitable format.
  • Produce the data to a Kafka topic using the Kafka client libraries available for languages like Java, Python, or Node.js.

4. Stream Processing Frameworks

Use a stream processing framework like Apache NiFi or StreamSets, which have capabilities to ingest data from various sources including REST APIs. These frameworks offer robust data ingestion, transformation, and delivery to Kafka.

5. Using Scheduled Scripts

A simple but less scalable approach involves scheduled scripts (e.g., written in Python, Shell) that run at specific intervals (using cron jobs, for example), fetch data from REST endpoints, and push it to Kafka using Kafka’s client libraries or REST Proxy.

TigerGraph -(Python)-> Kafka

Step 1: Set Up Your Kafka Environment

Before you begin ingesting data, ensure that your Kafka cluster is properly set up. This includes setting up Kafka brokers, creating the necessary Kafka topics, and configuring Kafka producers. If you haven’t already set up Kafka, you can follow these general steps:

  1. Install Kafka: Download and install Apache Kafka from the Apache website. Follow the installation instructions specific to your operating system.

  2. Start Zookeeper and Kafka Server:

    # Start Zookeeper
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # Start Kafka server
    bin/kafka-server-start.sh config/server.properties
    
  3. Create a Kafka Topic:

    bin/kafka-topics.sh --create --topic tigergraph_data --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
    

Step 2: Configure TigerGraph

Ensure that your TigerGraph system is correctly configured and that you have the necessary queries or APIs set up to extract data.

  1. Install GSQL Client (if not already installed):
    Download and configure the GSQL client to connect to your TigerGraph server.

  2. Create a Graph Query (if not already created):
    Develop a GSQL query that outputs your desired data. Ensure the query is optimized for large data exports.

Step 3: Develop a Data Extraction and Streaming Application

Write a custom application that executes your TigerGraph query, fetches results, and sends them to Kafka. Here’s how you can do this in Python using pyTigerGraph for TigerGraph connectivity and kafka-python for Kafka integration.

  1. Install Required Libraries:

    pip install pyTigerGraph kafka-python
    
  2. Sample Python Script:

    import pytigergraph as tg
    from kafka import KafkaProducer
    import json
    
    # Configuration
    tg_host = 'http://your.tigergraph.server'
    tg_graphname = 'your_graph'
    tg_secret = 'your_secret'
    kafka_bootstrap_servers = ['localhost:9092']
    kafka_topic = 'tigergraph_data'
    
    # Initialize TigerGraph Connection
    conn = tg.TigerGraphConnection(host=tg_host, graphname=tg_graphname, apiToken=tg_secret)
    
    # Initialize Kafka Producer
    producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers,
                             value_serializer=lambda x: json.dumps(x).encode('utf-8'))
    
    # Define the query
    query_name = "your_query_here"
    
    # Execute the query
    results = conn.runInstalledQuery(query_name, timeout=30)
    
    # Send data to Kafka
    for result in results[0]["@@result_variable_name"]:
        producer.send(kafka_topic, value=result)
        producer.flush()
    
    print("Data ingestion to Kafka topic is complete.")
    

Hi John,
Thanks for the quick response.

The initial thoughts are that REST api calls is not really best way for large amounts of data. I am working for scenario where a query output would be 10-50 million records per day.
I also think there are limits on REST payload. Don’t exactly remember. Please shed light on this too.

I understand the pythonic way too but I think there is no parallelism there.

@interesting_ideas with a query output of 10-50million records. Will this be more of a batch operation? One method you could employ is outputting the TigerGraph query output as a file and adding it to a hot/drop folder. Then use Dask which is meant for processing large files and pushing those to Kafka.

More information on Dask → Kafka

Using Dask for handling large-scale data processing tasks such as loading massive CSV files into Kafka can indeed be an efficient alternative to manually implementing parallelism and batching. Dask is particularly well-suited for parallel computing in Python and can handle large datasets by breaking them into smaller chunks which can be processed in parallel across multiple threads, processes, or even different machines.

Advantages of Using Dask:

  1. Distributed Computing: Dask can distribute data and computation tasks over multiple cores or even different machines, significantly speeding up processing times for large datasets.
  2. Lazy Evaluation: Dask operations are lazily evaluated, which means they don’t compute the results immediately but wait until you explicitly ask for them. This allows Dask to optimize the entire computation task.
  3. Scalability: Dask scales gracefully from single machines to clusters, providing a smooth transition from development to production environments without needing to switch tools.
  4. Integration with Existing Tools: Dask integrates well with existing Python libraries like Pandas, NumPy, and Scikit-Learn, making it easy to incorporate into existing data processing workflows.

Using Dask to Process and Load Data into Kafka:

Here’s how you might set up a workflow using Dask to process CSV files and load the data into Kafka:

1. Install Necessary Libraries

You’ll need dask, dask[distributed] for setting up a distributed environment, and confluent_kafka for Kafka integration.

bashCopy code

pip install dask[complete] confluent-kafka

2. Setup Dask Distributed Client

This step involves setting up a Dask client which will help in managing the distribution of data and tasks.

pythonCopy code

from dask.distributed import Client

client = Client()  # Starts a local Dask client

3. Read Data with Dask

Use Dask DataFrame to read data from CSV. Dask DataFrame splits the large CSV file into smaller partitions, which can be processed in parallel.

pythonCopy code

import dask.dataframe as dd

# Adjust the blocksize according to the size of your data and number of cores
ddf = dd.read_csv('path_to_your_large_file.csv', blocksize=25e6)  # 25 MB chunks

4. Define Kafka Producer Function

Set up a function that will send batches of data to Kafka. This function processes each partition of the Dask DataFrame.

pythonCopy code

from confluent_kafka import Producer
import json

def send_to_kafka(partition, topic_name='your_kafka_topic'):
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
    for _, row in partition.iterrows():
        producer.produce(topic=topic_name, value=json.dumps(row.to_dict()))
        producer.poll(0)
    producer.flush()

5. Apply Function to Each Partition

Use the map_partitions method to apply the send_to_kafka function to each partition of the Dask DataFrame. This will run in parallel across all partitions.

pythonCopy code

# Apply the function to all partitions
result = ddf.map_partitions(send_to_kafka, meta=ddf._meta).compute()

Considerations:

  • Monitoring: Monitor the Dask dashboard to optimize the chunk sizes and resource allocation.
  • Error Handling: Implement try-except blocks within the send_to_kafka function to handle possible Kafka errors like timeouts or network issues.
  • Performance Tuning: Adjust the blocksize parameter based on the size of your dataset and the memory capacity of your processing nodes. Smaller blocks mean more tasks and can lead to overhead, whereas larger blocks might lead to underutilization of resources.

Using Dask provides a flexible, efficient way to handle large volumes of data for ingestion into Kafka, especially when the data operations are complex and need to be distributed across several processors or nodes. It simplifies managing parallelism and can dynamically scale based on workload.

We are currently using the file based approach but we wanna understand if there’s a faster way to do this without writing to files and reading them back.

Can the Dask be used to read data from TG rest api?

@interesting_ideas Yes, Dask can be used to read data from the TigerGraph (TG) REST API, although it involves a few steps to set up and manage the process efficiently. Here’s a general approach on how to use Dask to read data from the TigerGraph REST API:

Step 1: Set Up TigerGraph REST API Access

Ensure you have the necessary endpoint, credentials, and permissions to access the TigerGraph REST API. You typically need the following:

  • Base URL of the API
  • Endpoint paths for specific resources
  • Authentication tokens or credentials

Step 2: Define Functions to Query the API

You can use Python libraries like requests to define functions that interact with the TigerGraph REST API. Here’s a basic function to perform API calls:

import requests

def fetch_data_from_tg(endpoint, params=None, headers=None):
    """Fetch data from a specified TG endpoint."""
    response = requests.get(endpoint, params=params, headers=headers)
    response.raise_for_status()  # Will raise an HTTPError for bad requests
    return response.json()

Step 3: Set Up Dask

Install Dask and familiarize yourself with its basics. You can use Dask’s delayed function to parallelize data fetching operations. Here’s how you can integrate Dask:

from dask import delayed, compute
import dask.bag as db

# Example endpoints or queries
endpoints = [
    "http://your-tg-api.com/query1",
    "http://your-tg-api.com/query2",
    # Add more as needed
]

# Headers usually include authentication tokens and content type
headers = {"Authorization": "Bearer YOUR_ACCESS_TOKEN", "Content-Type": "application/json"}

# Create delayed objects for asynchronous execution
tasks = [delayed(fetch_data_from_tg)(ep, headers=headers) for ep in endpoints]

# Execute tasks in parallel
results = compute(*tasks)

Step 4: Handling Large Datasets

If the data retrieved from TigerGraph is large, consider using Dask’s Bag or DataFrame structures to process data in chunks. This helps manage memory usage and can speed up processing times:

# Assuming 'results' contains large datasets
dask_bag = db.from_sequence(results)
processed_data = dask_bag.map(process_data_function)  # Define your processing function
processed_data.compute()

Note: Be sure to handle API rate limits and timeouts in your functions to maintain robustness in your data pipeline.

Thanks for the detailed answer Jon.

I want to move a step back and want to understand that here you are trying to hit

  • multiple API to different queries (or)
  • same query , but you provide starting record and ending record.

We have query that outputs at least 10M records when run. It can also go way beyond like 50M. In this case, we can’t have a single API request to get all data since the payload would be too large.

We might require pagination of results, say 50k results per request then we need to fire at least 200 requests. The per API records could be less and we might need to hit more requests too. Does this affect the health of Server where TG is installed?

And how do we do pagination for GSQL query?