DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Low-Code Development: Leverage low and no code to streamline your workflow so that you can focus on higher priorities.

DZone Security Research: Tell us your top security strategies in 2024, influence our research, and enter for a chance to win $!

Launch your software development career: Dive head first into the SDLC and learn how to build high-quality software and teams.

Open Source Migration Practices and Patterns: Explore key traits of migrating open-source software and its impact on software development.

Related

  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • What Is a Streaming Database?
  • High Throughput vs. Low Latency in Data Writing: A Way to Have Both

Trending

  • Javac and Java Katas, Part 2: Module Path
  • Apache Hudi: A Deep Dive With Python Code Examples
  • Comparing Axios, Fetch, and Angular HttpClient for Data Fetching in JavaScript
  • Contexts in Go: A Comprehensive Guide
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Harnessing Kafka Streams for Seamless Data Tasks

Harnessing Kafka Streams for Seamless Data Tasks

Learn about the use of Kafka for convenient data exploration and dynamic data integration with its "publish-subscribe" model, organizing data into topics.

By 
Ilia Ivankin user avatar
Ilia Ivankin
·
Jun. 18, 24 · Tutorial
Like (1)
Save
Tweet
Share
3.4K Views

Join the DZone community and get the full member experience.

Join For Free

Reason

In modern distributed systems, efficient real-time data discovery and integration are common tasks. We often encounter approaches involving multiple queues or services that collect data into a database, followed by various search queries. But what if I told you that storing data in a database is unnecessary when you can read data directly from Kafka, like a table?

Apache Kafka is a powerful event streaming platform offering robust data discovery and real-time data integration capabilities. This makes it a versatile solution for building scalable event-driven architectures.

This article will explore using Kafka for convenient data exploration and dynamic data integration.

Short Intro

Understanding Kafka Topics and Event Streams

Apache Kafka operates on a "publish-subscribe" model, organizing data into topics. Producers publish records (events) to specific topics, and consumers subscribe to these topics to process records in real-time. Each record in a topic typically contains key-value pairs representing structured data.

Key Features for Data Discovery in Kafka

Topic Partitioning and Scalability

Kafka topics are divided into partitions, allowing data to be distributed across multiple brokers for scalability. This partitioning enables parallel processing and efficient data retrieval.

Retention and Compaction

Kafka retains data for a configurable period, allowing consumers to replay events within a specified time window. Log compaction ensures that only the latest value for each key is retained, simplifying data management.

Use Cases

  • Real-time analytics: Aggregating and analyzing real-time streaming data from various sources to derive meaningful insights
  • Dynamic data enrichment: Enhancing incoming data streams by joining with reference data or historical records stored in Kafka
  • Fraud detection: Detecting anomalies or fraudulent patterns by correlating events and transactions in real-time

Practice!

We didn’t change anything by default; we know that Kafka saves all our messages in topics, which we can read and work with, like SQL tables.

So, we should imagine that we have two services. The first one processes messages which consist of important information for our department. We have a topic "events", and it has a structure like this:

JSON
 
{
	"product_id": "2e7bca59-406b-48e3-b92c-f0a744bda108",
	"timestamp": 1713648805,
	"category": "OfficeSupplies",
	"app": "GoShop",
	"session_id": "af9dd378-0021-42c8-82ca-9fc1a861a342",
    "user_id": "af9dd378-0021-42c8-82ca-9fc1a861a342"
}


Important fields here:

Shell
 
user_id // uniq uuid v4


Also:

Shell
 
category // uniq goods category name


The second one sends purchase info when our user buys something. Example:

JSON
 
{
    "order_id": "0ce3260e-f896-45e2-b8e9-88a4dea8fb7b",
    "user_id": "76046ad5-ffda-4f06-8c1e-730de11f585a",
    "order_time": 1713649496,
    "total_price": 96.9778306289559,
    "products": [
        {
            "product_id": "b19494c5-6913-42a6-aab1-80049334d107",
            "category": "Furniture",
            "price": 42.82092921543266
        },
        {
            "product_id": "20d44dc0-a648-4b49-908f-609ef7bd9e6d",
            "category": "Books",
            "price": 4.742365306930942
        },
        {
            "product_id": "9f2fb735-32b1-4a8a-aeee-d25312d284e3",
            "category": "Electronics",
            "price": 49.41453610659229
        }
    ]
}


We can see that it has a category field as well. Now, we have a relatively simple task: to identify all purchases that occurred after our user visited a specific category. For example, a user decides to browse the electronics category and later purchases a new laptop. We have both events - the browsing and the purchase. Our goal is to determine whether they are linked to each other based on the user's user_uuid.

This time, we'll need to write a small piece of Java code because it has been the best choice for working with Kafka for several years now. Kafka and stream processing are my main focus, so feel free to ask any questions related to Kafka or stream processing.

Let’s Start!

Kafka Streams process

  1. shop stream: We should create the first stream using the shop-purchase topic.

    Java
     
    KStream<String, PurchaseRecord> shopStream = kStreamBuilder.stream(
           streamProperty.shopTopic(),
           Consumed.with(Serdes.String(), new JsonSerde<>(PurchaseRecord.class))
    );


  2. Event stream: The second stream uses the events topic.

    Java
     
    KStream<String, EventRecord> viewStream = kStreamBuilder.stream(
           streamProperty.eventTopic(),
           Consumed.with(Serdes.String(), new JsonSerde<>(EventRecord.class))
    );


  3. We need to merge it into a new one, but only not null values:

    Java
     
     KStream<String, EnrichedPurchase> enrichedPurchases = shopStream.join(
                    viewStream.toTable(), // to table 
                    (purchase, view) -> {
                        if (purchase.userId().equals(view.userId())) { // find same use
                            Optional<String> category = purchase.products() // consist??
                                    .stream()
                                    .map(ProductRecord::category)
                                            .filter(f -> f.equals(view.category()))
                                    .findFirst();
    
                            if (category.isPresent()) { // at least one!
                                return new EnrichedPurchase(view, purchase);
                            }
                        }
                        return null;
                    }
            ).map(KeyValue::new)
                    .filter((i, v) -> v != null);


  4. Send to result topic:

    Java
     
    enrichedPurchases.to(
          streamProperty.resultTopic(),
          Produced.with(Serdes.String(), new JsonSerde<>(EnrichedPurchase.class))
    );


As a result, we send:

JSON
 
{
    "event": {
        "product_id": "c49ff516-90fc-4e94-9538-bb0b24d148a9",
        "session_id": "e5adff32-f8c6-4b35-903d-07d85415d681",
        "user_id": "2e5a847b-08bd-48ae-8c42-bee0a5391b30",
        "timestamp": 1713702160,
        "category": "PetSupplies",
        "app": "GoShop"
    },
    "purchase": {
        "order_id": "7f8eb08c-3e77-46b9-b8b1-3f4250c76386",
        "user_id": "2e5a847b-08bd-48ae-8c42-bee0a5391b30",
        "total_price": 440.87111325777965,
        "orderTime": 0,
        "products": [
            {
                "product_id": "4610a39e-ba5e-4034-b45e-7d3dc738e364",
                "category": "PetSupplies",
                "price": 57.415938152296654
            }
        ]
    }
}


The full code is available on GitHub.

Conclusion

Apache Kafka simplifies data discovery and dynamic data integration by providing a unified platform for event streaming and data integration. By leveraging Kafka's scalability, retention policies, and stream processing capabilities, developers can create efficient, scalable, event-driven applications without needing separate databases or complex data pipelines.

Organizations can harness event-driven architecture capabilities to unlock new opportunities in data exploration, integration, and real-time analytics using the Kafka ecosystem, such as Kafka Streams and Kafka Connect.

If you don't require a complex architecture, there's no need to maintain long-term data storage or set up separate databases for various metrics, so consider using Kafka Streams. This tool allows you to process data without creating additional structures and enables real-time data reading.

Moreover, processed data from several topics can easily be directed to a new topic for further storage or utilization. This is particularly convenient for performing analytical calculations.

Further Reading

  • Kafka Books and Papers
Data integration Stream processing Data (computing) kafka Stream (computing)

Published at DZone with permission of Ilia Ivankin. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • What Is a Streaming Database?
  • High Throughput vs. Low Latency in Data Writing: A Way to Have Both

Partner Resources


Comments

ABOUT US

  • About DZone
  • Send feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: