Low-Code Development: Leverage low and no code to streamline your workflow so that you can focus on higher priorities.
DZone Security Research: Tell us your top security strategies in 2024, influence our research, and enter for a chance to win $!
Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Profiling Big Datasets With Apache Spark and Deequ
What Is Reverse ETL? Overview, Use Cases, and Key Benefits
Snowflake brings unbeatable uniqueness to the tech world: it is a cloud-based data warehousing solution that targets removing the nightmares associated with business data storage, management, and analytics. Essentially, its solution looks much like an all-in-one platform for data utilization in ways traditional setups could only have wished for. Before we go deep into Snowflake, let's first clarify what a data warehouse is. An extensive system stores and analyses big data sets from many sources. The main objective? It is a system that assures businesses can make decisions based on their data insights. Traditional data solutions are hardware-dependent, complex to deploy, and limited regarding scalability. On the other hand, cloud solutions with flexibility, scalability, and lower upfront costs are the features that bring significant shifting traits in technologies adapting to business needs — Snowflake. Core Components of Snowflake Database Storage Snowflake's unique architecture dynamically manages data, making it accessible and secure across multiple cloud platforms and for all data types. Intelligent architecture ensures low storage costs from very effective data compression and partitioning. Snowflake provides robust security capabilities, such as always-on encryption and finely tuned access controls for the highest data integrity and compliance levels. Query Snowflake's first-class engines, called 'Virtual Warehouses,' process queries on a single line and provide real-time data processing without lag. Independent compute clusters: These help scale and optimize performance. These clusters work independently, allowing users to scale up or down based on performance needs without affecting other operations. Snowflake also offers job prioritization, enabling the smooth running of critical queries while less important ones wait in line. Cloud Services Layer This layer, from storage to query execution, supports all core operations, ensuring seamless performance and security. Snowflake ensures best-in-class live data sharing between groups without moving the data, enabling maximum collaboration. The background processes are managed to ensure no impact on business operations. Data Management Within Snowflake Structured and Semi-Structured Data Loading and Transformation Processes Snowflake supports no manual effort while achieving more data loading and transformation accuracy through automation. Therefore, it should easily be capable of processing many data formats, including structured and semi-structured, such as JSON, without users having to be pointed to standalone data transformation tools. Example Workflow (e.g., Workflows for Data Integration) "I would never imagine being able to cope with such a problem. I would run." The verb "imagine" in this sentence doesn't seem appropriate for the context. Snowflake Architecture for Warehouse Scalability and Flexibility Explain vertical and horizontal scaling: Whether you need more computing power (vertical) or need to handle more operations simultaneously (horizontal), Snowflake scales smoothly. Adapt performance requirements to costs: This means scaling resources up or down with a few clicks, optimizing performance, and controlling costs more effectively. Elasticity: Snowflake automatically adapts to changes in workload without manual interference, consistently maintaining high performance even during unexpected surges in workload. Data Cloning and Time Travel Benefits of zero-copy cloning for developers: This capability enabled developers to clone databases or tables without adding storage costs, resulting in shorter testing and development timeframes. Explaining the details of data retrieval through time travel: Time Travel allows you to access and restore data from any given point within a configurable past window—critical for unexpected data recovery needs. Implementation of cloning and time travel features: Starting from basic error correction and historical analysis, these features provide all the crucial tools to manipulate and manage data effectively. Integration and Compatibility Integration With Other Services Connecting Snowflake with BI tools and ETL systems: Snowflake integrates with a host of BI and ETL third-party tools, making data workflows easy and improving overall productivity. API and driver support: Enjoy full API and driver support for popular programming languages to easily integrate Snowflake into your tech stack. Collaboration across diverse platforms and cloud providers: Thanks to Snowflake's enterprise-grade, cloud-agnostic framework, running solutions across Amazon AWS, Microsoft Azure, and Google Cloud is possible without compatibility issues. Supported Programming Languages Examples of Using Language-Specific Features Now, let's show examples of using some language-specific features. Custom libraries: Snowflake provides custom libraries for languages such as Java and many more, which makes the developer experience much more accessible. Optimization tips for Python, Java, and SQL: Data caching and batch querying can optimize performance and reduce latency. Additional optimization techniques include using compressed data formats and appropriate fetch sizes to ensure smooth and efficient data flow. Security and Compliance Build-in security features: Snowflake supports automatic encryption, network policies, and multi-factor authentication to secure data. International security compliance: Snowflake adheres to international standards in all its practices to meet the regulatory requirements for data handling, including GDPR. Best practices in data privacy and security: To enhance security, organizations should introduce best practices such as regular audits, role-based access control, and continuous monitoring. Practical Implementation and Use Cases Setting up your first Snowflake environment: A step-by-step guide —The initial setup process is very user-friendly, making it accessible even for beginners. This includes setting up user roles and implementing security measures. Configure initial settings and permissions: Access can be tailored for teams to configure settings and permissions, ensuring security measures are maintained. Tips for efficient data loading and querying: Follow these tips to ensure efficient data loading during designated hours without overloading the system while enabling efficient querying. Cost Management and Optimization Controlling costs in Snowflake: With Snowflake, you pay for what you use, enabling effective cost management and avoiding resource overcommitment. Built-in analytic tools can track and optimize usage patterns, ensuring cost-effective operations. What you can do with Snowflake: pricing —You can activate Virtual Warehouses with auto-suspend features and enhance data clustering to improve query performance efficiency. Brilliant. Analytical Insights and Business Intelligence How companies use Snowflake: Companies of all sizes, from small startups to large enterprises, use Snowflake for scalable analytics, which provides their staff with actionable insights. Analytic features to facilitate smart decisions: Snowflake offers features such as data sharing and secure views, fostering a data-driven culture by empowering teams with real-time insights. Inclusion of predictive analytics and machine learning: Snowflake supports predictive analytics and machine learning integration. For example, it seamlessly integrates with Spark, enabling the incorporation of AI and machine learning capabilities into your workflows for predictive analytics. Future Outlook and Enhancements Features and Development Roadmap Continuous innovation in the application will provide performance, security, and usability enhancements in future versions. Improvement usually involves more traits that may be compelled to change the modus operandi, significantly resulting in better productivity and safety. Community and Support The Snowflake community is exciting and active, allowing users to share ideas, solutions, and insights. The company will provide employees with all the resources. Snowflake empowers human documentation, tutorials, forums, and forums to let the users' learning curve and operational excellence continue. Snowflake Customization Snowflake is a flexible and powerful platform for developers to build robust custom applications meeting the required needs. End-to-end implementation and case studies of custom solutions built on the Snowflake platform. From custom data models to bespoke analytics solutions, developers use Snowflake's features to build uniquely tailored applications. Conclusion This guide studied Snowflake's architecture, which is powerful, flexible, and friendly. It starts with data storage and management, and Snowflake perfectly addresses business intelligence to rectify multiple business issues regarding data handling. Whether you're just starting or have a lot of data, Snowflake grows with you through our flexible, cost-effective solutions. Consider Snowflake a pivotal addition to your data strategy, with seamless elasticity, robust security features, and built-in support.
At a high level, bad data is data that doesn’t conform to what is expected. For example, an email address without the “@”, or a credit card expiry where the MM/YY format is swapped to YY/MM. “Bad” can also include malformed and corrupted data, such that it’s completely indecipherable and effectively garbage. In any case, bad data can cause serious issues and outages for all downstream data users, such as data analysts, scientists, engineers, and ML and AI practitioners. In this blog, we’ll take a look at how bad data may come to be, and how we can deal with it when it comes to event streams. Event Streams in Apache Kafka are predicated on an immutable log, where data, once written, cannot be edited or deleted (outside of expiry or compaction — more on this later). The benefit is that consumers can read the events independently, at their own pace, and not worry about data being modified after they have already read it. The downside is that it makes it trickier to deal with “bad data,” as we can’t simply reach in and edit it once it’s in there. In this post, we look at bad data in relation to event streams. How does bad data end up in an event stream? What can we do about it? What’s the impact on our downstream consumers, and how can we fix it? First, let’s take a look at the batch processing world, to see how they handle bad data and what we can learn from them. Bad Data in Batch Processing What is batch processing? Let’s quote Databricks, and go with: Batch Processing is a process of running repetitive, high volume data jobs in a group on an ad-hoc or scheduled basis. Simply put, it is the process of collecting, storing and transforming the data at regular intervals. Batch processing jobs typically rely on extracting data from a source, transforming it in some way, and then loading it into a database (ETL). Alternately, you can load it into the destination before you transform any of the data, in a recently trendy mode of operations known as ELT (by the way, data lake/warehouse people LOVE this pattern as they get all the $$$ from the transform). A friend and former colleague wrote more about ELTs and ETLs here, so take a look if you want to see another data expert’s evaluation. The gist, though, is that we get data from “out there” and bring it into “here”, our data lake or warehouse. In this figure, a periodic batch job kicks off, processes the data that lands in the landing table, and does something useful with it — like figure out how much money the company is owed (or how much your Datalake is costing you). Accomplishing this requires a reliable source of data — but whose job is it to ensure that the data powering the data lake is trustworthy and high quality? To cut to the chase, the data (or analytics) engineers in the data lake are responsible for getting data from across the company, pulling it in, and then sorting it out into a trustworthy and reliable format. They have little to no control over any of the changes made in production land. Data engineers typically engage in significant break-fix work keeping the lights on and the pipelines up and running. I would know, as I did this type of work for nearly 10 years. We’d typically apply schemas to the data once it lands in the data lake, meaning that changes to the source database table in production land may (likely) break the data sources in the data lake. Data engineers spend nights and weekends fixing malfunctioning data pipelines, broken just hours ago by the 5 pm database migration. Why are the data engineers responsible for applying schemas? The operational system owners have historically had no responsibility for data once it has crossed out of the source application boundary. Additionally, the data engineers taking the data out of the operational system are performing a “smash ‘n grab”, taking the data wholesale from the underlying database tables. It’s no surprise then that the operational team, with no responsibility for data modeling outside of their system, causes a breakage through a perfectly reasonable database change. The following figure shows how a broken ETL job (1) can cause bad data to show up in the Landing table (2), eventually resulting in (3) bad results. The exact reason for a broken ETL can vary, but let’s just say in this case it’s due to a change in types in the source database table (an int is now a string) that causes type-checking errors downstream. Once the data engineers spring from their beds and leap into action at 3 in the morning (when all data problems naturally occur), they can proceed to fix the ETL (4) by adding logic to handle the unexpected change. Next, they reprocess the failed batch to fix the landing table data (5), then rerun the job (6) that recomputes the results table (7) for the affected rows. For instance, say (7 — above) is a hive-declared table containing daily sales aggregate. I’m only going to delete and recompute the aggregates that I know (or think) are incorrect. I won’t drop the whole table and delete all the data if only a known subset is affected. I’ll just surgically remove the bad days (eg, April 19 to 21, 2024), reprocess (6 — above) for that time range, and then move on to reprocessing the affected downstream dependencies. Batch processing relies extensively on cutting out bad data and selectively replacing it with good data. You reach right into that great big data set, rip out whatever you deem as bad, and then fill in the gap with good data — via reprocessing or pulling it back out of the source. Bad Data Contaminated Data Sets The jobs that are downstream of this now-fixed data set must also be rerun, as their own results are also based on bad input data. The downstream data sets are contaminated, as are any jobs that run off of a contaminated data set. This is actually a pretty big problem in all of data engineering, and is why tools like dbt, and services like Databrick’s Delta Tables and Snowflake’s Dynamic Tables are useful — you can force recomputation of all dependent downstream jobs, including dumping bad data sets and rebuilding them from the source. But I digress. The important thing to note here is that once you get bad data into your data lake, it spreads quickly and easily and contaminates everything it touches. I’m not going to solve this problem here for you in this blog, however, but I do want you to be aware that it’s not always as simple as “cut out the bad, put in the good!” for fixing bad data sets. The reality is a lot messier, and that’s a whole other can of worms that I’m just going to acknowledge as existing and move on. Incremental Batch Processing One more word about processing data in batches. Many people have correctly figured out that it’s cheaper, faster, and easier to process your data in small increments, and named it incremental processing. An incremental processing job reads in new data and then applies it to its current state based on its business logic. For example, computing the most popular advertisements in 2024 would simply require a running tally of (advertisementId, clickCount), merging in the new events as they arrive. However, let’s say that you had bad data as input to your incremental job — say we’ve incorrectly parsed some of the click data and attributed them to the wrong advertisementId. To fix our downstream computations we’d have to issue unclick data, telling them “remove X clicks from these ads, then add X clicks to these other ads”. While it’s possible we could wire up some code to do that, the reality is we’re going to keep it simple: Stop everything, blow all the bad data away, rebuild it with good data, and then reprocess all the jobs that were affected by it. “Hold on”, you might say. “That’s nonsense! Why not just code in removals in your jobs? It can’t be that hard”. Well… kinda. For some jobs with no dependencies, you may be correct. But consider a moment if you have a process computing state beyond just simple addition and subtraction, as pretty much all businesses do. Let’s say you’re computing taxes owed for a corporation, and you’re dealing with dozens of different kinds of data sets. The logic for generating the final state of your corporate taxes is winding, treacherous, and not easily reversible. It can be very challenging and risky to code mechanisms to reverse every conceivable state, and the reality is that there will be cases that you simply don’t foresee and forget to code. Instead of trying to account for all possible reversal modes, just do what we do with our misbehaving internet routers. Just unplug it, wipe the state, and start it over. Heck, even dbt encourages this approach for misbehaving incremental jobs, calling it a full_refresh. Here are the important takeaways as we head into the streaming section. There is little prevention against bad data: The data engineering space has typically been very reactive. Import data of any and all quality now, and let those poor data engineers sort it out later. Enforced schemas, restrictions on production database migrations, and formalized data contracts between the operations and data plane are rarely used. The batch world relies on deleting bad data and reprocessing jobs: Data is only immutable until it causes problems, then it’s back to the drawing board to remutate it into a stable format. This is true regardless of incremental or full refresh work. I am often asked, “How do we fix bad data in our Kafka topic?” This is one of the big questions I myself asked as I got into event streaming, as I was used to piping unstructured data into a central location to fix up after the fact. I’ve definitely learned a lot of what not to do over the years, but the gist is that the strategies and techniques we use for batch-processed data at rest don’t transfer well to event streams. For these, we need a different set of strategies for addressing bad data. But before we get to those strategies, let’s briefly examine what happens to your business when you have bad data in your system. Beware the Side Effects of Processing Bad Data Bad data can lead to bad decisions, both by humans and by services. Regardless of batch processing or streaming, bad data can cause your business to make incorrect decisions. Some decisions are irreversible, but other decisions may not be. For one, reports and analytics built on bad data will disagree with those built on good data. Which one is wrong? While you’re busy trying to figure it out, your customer is losing confidence in your business and may choose to pull out completely from your partnership. While we may call these false reports a side effect, in effect, they can seriously affect the affectations of our customers. Alternatively, consider a system that tabulates vehicle loan payments, but incorrectly flags a customer as non-paying. Those burly men that go to repossess the vehicle don’t work for free, and once you figure out you’ve made a mistake, you’ll have to pay someone to go give it back to them. Any decision-making that relies on bad data, whether batch or streaming, can lead to incorrect decisions. The consequences can vary from negligible to catastrophic, and real costs will accrue regardless of if it’s possible to issue corrective action. You must understand there can be significant negative impacts from using bad data in stream processing, and only some results may be reversible. With all that being said, I won’t be able to go into all of the ways you can undo bad decisions made by using bad data. Why? Well, it’s primarily a business problem. What does your business do if it makes a bad decision with bad data? Apologize? Refund? Partial Refund? Take the item back? Cancel a booking? Make a new booking? You’re just going to have to figure out what your business requirements are for fixing bad data. Then, you can worry about the technology to optimize it. But let’s just get to it and look at the best strategies for mitigating and dealing with bad data in event streams. I’m Streaming My Life Away Event streams are immutable (aside from compaction and expiry). We can’t simply excise the bad data and inject corrected data into the space it used to occupy. So what else can we do? The most successful strategies for mitigating and fixing bad data in streams include, in order: Prevention: Prevent bad data from entering the stream in the first place: Schemas, testing, and validation rules. Fail fast and gracefully when data is incorrect. Event design: Use event designs that let you issue corrections, overwriting previous bad data. Rewind, rebuild, and retry: When all else fails. In this blog, we’re going to look primarily at prevention, covering the remaining strategies in a follow-up post. But to properly discuss these solutions, we need to explore what kind of bad we’re dealing with and where it comes from. So let’s take a quick side trip into the main types of bad data you can expect to see in an event stream. The Main Types of Bad Data in Event Streams As we go through the types, you may notice a recurring reason for how bad data can get into your event stream. We’ll revisit that at the end of this section. 1. Corrupted Data The data is simply indecipherable. It’s garbage. It turned into a sequence of bytes with no possible way to retrieve the original data. Data corruption is relatively rare but may be caused by faulty serializers that convert data objects into a plain array of bytes for Kafka. Luckily, you can test for that. 2. Event Has No Schema Someone has decided to send events with no schema. How do you know what’s “good data” and what’s “bad data”, if there are no structure, types, names, requirements, or limitations? 3. Event Has an Invalid Schema Your event’s purported schema can’t be applied to the data. For example, you’re using the Confluent Schema Registry with Kafka, but your event’s Schema Id doesn’t correspond to a valid schema. It is possible you deleted your schema, or that your serializer has inserted the wrong Schema Id (perhaps for a different schema registry, in a staging or testing environment?). 4. Incompatible Schema Evolution You’re using a schema (hooray!), but the consumer cannot convert the schema into a suitable format. The event is deserializable, but not mappable to the schema that the consumer expects. This is usually because your source has undergone breaking schema evolution (note that evolution rules vary per schema type), but your consumers have not been updated to account for it. 5. Logically Invalid Value in a Field Your event has a field with a value that should never be. For example, an array of integers for “first_name”, or a null in a field declared as a NPE (see below). This error type arises when you are not using a well-defined schema, but simply a set of implicit conventions. It can also arise if you are using an invalid, incomplete, old, or homemade library for serialization that ignores parts of your serialization protocol. 6. Logically Valid but Semantically Incorrect These types of errors are a bit trickier to catch. For example, you may have a serializable string for a “first_name” field (good!), but the name is “Robert’); DROP TABLE Students; — ”. While little Bobby Tables here is a logically valid answer for a first_name field, it is highly unlikely improbable that this is another one of Elon Musk’s kids. The data in the entry may even be downright damaging. The following shows an event with a negative “cost”. What is the consumer supposed to do with an order where the cost is negative? This could be a case of a simple bug that slipped through into production, or something more serious. But since it doesn’t meet expectations, it’s bad data. Some event producer systems are more prone to these types of errors. For example, a service that parses and converts NGINX server logs or customer-submitted YAML/XML files of product inventory into individual events. Malformed sources may be partially responsible for these types of errors. 7. Missing Events This one is pretty easy. No data was produced, but there should have been something. Right? The nice thing about this type of bad data is that it’s fairly easy to prevent via testing. However, it can have quite an impact if only some of the data is missing, making it harder to detect. More on this in a bit. 8. Events That Should Not Have Been Produced There is no undo button to call back an event once it is published to an event stream. We can fence out one source of duplicates with idempotent production, meaning that intermittent failures and producer retries won’t accidentally create duplicates. However, we cannot fence out duplicates that are logically indistinguishable from other events. These types of bad events are typically created due to bugs in your producer code. For example, you may have a producer that creates a duplicate of: An event that indicates a change or delta (“add 1 to sum”), such that an aggregation of the data leads to an incorrect value. An analytics event, such as tracking which advertisements a user clicked on. This will also lead to an overinflation of engagement values. An e-commerce order with its own unique order_id (see below). It may cause a duplicate order to be shipped (and billed) to a customer. While there are likely more types of bad data in event streams that I may have missed, this should give you a good idea of the types of problems we typically run into. Now let’s look at how we can solve these types of bad data, starting with our first strategy: Prevention. Preventing Bad Data With Schemas, Validation, and Tests Preventing the entry of bad data into your system is the number one approach to making your life better. Diet and exercise are great, but there’s no better feeling than watching well-structured data seamlessly propagate through your systems. First and foremost are schemas. Confluent Schema Registry supports Avro, Protobuf, and JSON Schema. Choose one and use it (I prefer Avro and Protobuf myself). Do yourself, your colleagues, and your future selves a favor. It’s the best investment you’ll ever make. There are also other schema registries available, though I personally have primarily used the Confluent one over the years (and also, I work at Confluent). But the gist is the same — make it easy to create, test, validate, and evolve your event schemas. Preventing Bad Data Types 1–5 With Schemas and Schema Evolution Schemas significantly reduce your error incident rates by preventing your producers from writing bad data, making it far easier for your consumers to focus on using the data instead of making best-effort attempts to parse its meaning. Schemas form a big part of preventing bad data, and it’s far, far, far easier to simply prevent bad data from getting into your streams than it is to try to fix it after the damage has already started. JSON is a lightweight data-interchange format. It is a common yet poor choice for events, but it doesn’t enforce types, mandatory and optional fields, default values, or schema evolution. While JSON has its uses, it’s not for event-driven architectures. Use an explicitly-defined schema such as Avro, Protobuf, or JSON Schema. Going schemaless (aka using JSON) is like going around naked in public. Sure, you’re “free” of the constraints, boundaries, and limitations, but at what expense? Everyone else has to figure out what the hell is going on, and chaos (and the police) will follow. But reeling back in the hyperbole, the reality is that your consumers need well-defined data. If you send data with a weak or loose schema, it just puts the onus on the consumer to try to figure out what you actually mean. Let’s say we have 2 topics with no schemas and 4 consumers consuming them. So many chances to screw up the data interpretation! There are 8 possible chances that a consumer will misinterpret the data from an event stream. And the more consumers and topics you have, the greater the chance they misinterpret data compared to their peers. Not only will your consumers get loud, world-stopping exceptions, but they may also get silent errors — miscalculating sums and misattributing results, leading to undetected divergence of consumer results. These discrepancies regularly pop up in data engineering, such as when one team’s engagement report doesn’t match the other team’s billing report due to divergent interpretations of unstructured data. It’s worth contrasting this multi-topic, multi-consumer approach with the typical ETL/ELT pipeline into the data plane. In this streaming model, we’re not differentiating who uses the data for what purposes. A consumer is a consumer. In contrast, with ETLs, we’re typically moving data into one major destination, the data lake (or warehouse), so it’s a lot easier to apply a schema to the data after it lands but before any dependent jobs consume it. With streaming, once it’s in the stream, it’s locked in. Implicit schemas, historical conventions, and tribal knowledge are unsuitable for providing data integrity. Use a schema, make it strict, and reduce your consumer's exposure to unintentional data issues. Once adopted, you can rely on your CI/CD pipelines to perform schema, data, and evolution validation before deploying. The result? No more spewing bad data into your production streams. Data Quality Rules: Handling Type 6: (Logically Valid but Semantically Incorrect) While many of the “bad data” problems can be avoided by using schemas, they are only a partial solution for this type. Sure, we can enforce the correct type (so no more storing Strings in Integer fields), but we can’t guarantee the specific semantics of the data. So what can we do, aside from using a schema? Producer unit tests. Throw Exceptions if malformed (eg: If the phone number is longer than X digits) Rely on Data Contracts and Data Quality Rules (note: JSON Schema also has some built-in data quality rules) Here’s an example of a Confluent data quality rule for a US Social Security Number (SSN). JSON { "schema": "…", "ruleSet": { "domainRules": [ { "name": "checkSsnLen", "kind": "CONDITION", "type": "CEL", "mode": "WRITE", "expr": "size(message.ssn) == 9" } ] } } This rule enforces an exact length of 9 characters for the SSN. If it’s an Integer, we could also enforce that it must be positive, and if it is a string it must only contain numeric characters. The data quality checks are applied when the producer attempts to serialize data into a Kafka record. If the message.ssn field is not exactly 9 characters in length, then the serializer will throw an exception. Alternatively, you can also send the record to a dead-letter queue (DLQ) upon failure. Approach DLQ usage with caution. Simply shunting the data into a side stream means that you’ll still have to deal with it later, typically by repairing it and resending it. DLQs work best where each event is completely independent with no relation to any other event in the stream and ordering is not important. Otherwise, you run the risk of presenting an error-free yet incomplete stream of data, which can also lead to its own set of miscalculations and errors! Don’t get me wrong. DLQs are a good choice in many scenarios, but they should truly be a last-ditch effort at preventing bad data from getting into a stream. Try to ensure that you test, trial, and foolproof your producer logic to publish your record to Kafka correctly the first time. Testing — Handling Types 7 (Missing Data) and 8 (Data That Shouldn’t Have Been Produced) Third in our trio of prevention heroes is testing. Write unit and integration tests that exercise your serializers and deserializes, including schema formats (validate against your production schema registry), data validation rules, and the business logic that powers your applications. Integrate producer testing with your CI/CD pipeline so that your applications go through a rigorous evaluation before they’re deployed to production. Both Type 7: Missing Data and Type 8: Data that shouldn’t have been produced are actually pretty easy to test against. One of the beautiful things about event-driven systems is that it’s so easy to test them. For integration purposes, you simply produce events on the inputs, wait to see what comes out of the outputs, and evaluate accordingly. Once you find the bug in your logic, write another test to ensure that you don’t get a regression. Summary Bad data can creep into your data sets in a variety of ways. Data at rest consists of partitioned files typically backed by a Parquet or ORC format. The data is both created and read by periodically executed batch processes. The files are mutable, which means that bad data can be fixed in place and overwritten, or it can be deleted and regenerated by the upstream batch job. Kafka topics, in contrast, are immutable. Once a bad event is written into the event stream, it cannot be surgically removed, altered, or overwritten. Immutability is not a bug but a feature — every consumer gets the same auditable data. But this feature requires you to be careful and deliberate about creating your data. Good data practices prevent you from getting into trouble in the first place. Write tests, use schemas, use data contracts, and follow schema evolution rules. After your initial investment, you and your colleagues will save so much time, effort, and break-fix work that you’ll actually have time to do some of the fun stuff with data — like getting actual work done. Prevention is the single most effective strategy for dealing with bad data. Much like most things in life, an ounce of prevention is worth a pound of cure (or 28.3g of prevention and 454g of cure for those of us on the metric system). In the next post, we’ll take a look at leveraging event design as part of handling bad data in event streams. There are many ways you can design your events, and we’ll look at a few of the most popular ways and their tradeoffs. Don’t touch that dial, we’ll be right back (yes, TVs used to have dials, and needed time to warm up).
Real-time data is no longer a nice-to-have, but a must-have when creating relevant and engaging user experiences. Most industries today have grown accustomed to consuming instant updates, so if you’re a front-end developer looking to break into real-time app development, you’ll need to master the flow of real-time data. As a developer advocate at Redpanda, my job is to help enable developers to leverage streaming data in their applications. Part of that involves introducing developers to better technologies and showcasing them in practical and fun use cases. So, in this post, I’ll demonstrate how I used three modern technologies — Redpanda Serverless, Pusher, and Vercel — to create a compelling real-time frontend application, which I hope will spark your own ideas for how you can implement this powerful trio in your world. The Cupcake Conundrum Imagine a bustling cupcake business in NYC. To reel in customers in such a competitive market, they’ll need to make their location readily visible to nearby cupcake fans. They’ll also need to engage their customers via immediate feedback to build trust, enhance the overall user experience, and drive repeat business and customer loyalty. However, developing real-time applications has been traditionally difficult as they are designed to respond to user inputs rather than continuously listen for and process incoming data streams. The latter requires a robust and complex infrastructure to manage persistent connections and handle high volumes of data with minimal latency. For the visual learners, here’s a quick video explaining the use case: Selecting the Right Technologies I chose Redpanda Serverless as the streaming data platform since traditional streaming data solutions, like Apache Kafka®, can be complex and resource-intensive, making it a massive hurdle for teams with limited time and resources. Some considerations when running the platform: Eliminates infrastructure overhead: It manages the backbone of streaming data, allowing me to focus on application logic. Simplifies scalability: Effortlessly scales with my application's needs, accommodating spikes in data without manual intervention. Reduces time to market: With a setup time of seconds, it speeds up development for quicker iterations and feedback. Pay as you grow: It adapts to my usage, ensuring costs align with my actual data processing needs, which is ideal for startups and small projects. This takes care of the complex infrastructure for dealing with high volumes of data and the low latency that’s expected of real-time applications. Now, I need to establish a single, long-lived connection between the browser and the server, typically done through WebSocket, which sets up a full-duplex communication channel over an HTTP connection. This allows the server to push updates to the browser client without needing periodic requests. However, Vercel doesn't support WebSocket, so I needed an alternative solution. Here's where Pusher pops up. Pusher lets me create real-time channels between the server and client, simplifying the complexity associated with directly using WebSocket. When deploying real-time frontend applications, Vercel stands out for its seamless Git repository integration that makes deployments easy. With a push of a button, changes are automatically updated and I can get website statistics and data from other solutions (like databases) when needed. Preparing the Application In my application, mapview.js acts as a Vercel serverless function, which plays the most important role by consuming data from the topic I created in Redpanda Serverless and then updating the inventory status. Before using Pusher to relay these updates to the front end, Serverless maps the store IDs in store_nyc.csv to their physical locations and then add the location information (latitude and longitude) that the client needs to render. JavaScript await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const messageData = JSON.parse(message.value.toString()); const location = storeLocations[messageData.store]; const { store, ...rest } = messageData; for (let store in inventory) { inventory[store].latest = false; } inventory[messageData.store] = { ...rest, ...location, latest: true }; try { pusher.trigger("my-channel",channelId, JSON.stringify(inventory)); } catch (error) { console.error('Error:', error); } }, }) Note: Vercel serverless functions have a maximum duration limit, which varies depending on your subscription plan. So, I set the MAX_BLOCK_TIME to five seconds. The Pro plan allows up to 300 seconds of execution for a better user experience. JavaScript await new Promise(resolve => setTimeout(resolve, MAX_BLOCK_TIME) ); On the front end, index.html presents the real-time map using the LeafletJS libraries and inventory updates, giving the end users a dynamic and interactive experience. JavaScript channel.bind('cupcake-inv', function(data) { var inventory = data; tableBody.innerHTML = ''; for (var store in inventory) { var storeData = inventory[store]; if (markers[store]) { markers[store].setLatLng([storeData.lat, storeData.lng]) .setPopupContent(`<b>${storeData.store}</b><br>Blueberry: ${storeData.blueberry}<br>Strawberry: ${storeData.strawberry}`); } else { markers[store] = L.marker([storeData.lat, storeData.lng]).addTo(map) .bindPopup(`<b>${storeData.store}</b><br>Blueberry: ${storeData.blueberry}<br>Strawberry: ${storeData.strawberry}`); } It also generates a unique session ID per session to create channels in Pusher, so each session will have its unique channel to receive updates. JavaScript channel.bind(uniqueChannelId, function(data) { var inventory = data; for (var store in inventory) { var storeData = inventory[store]; …… document.addEventListener('DOMContentLoaded', () => { fetch(`/api/mapview?channelId=${encodeURIComponent(uniqueChannelId)}`) The Recipe: Real-Time Cupcake Updates With Redpanda Serverless, Vercel, and Pusher It’s time to start cooking! Here's a step-by-step breakdown of how I brought this vision to life, which you can follow. If you want to skip ahead, you can find all the code in this GitHub repository. Step 1: Set up Redpanda Serverless Sign up and create the cluster: After signing up, click the Create Cluster button and select a region close to your workload, ensuring low latency for your data. Create the user and set permissions: Under the Security tab, create a new user and set the necessary permissions. Create the topic: Create a topic called inv-count that’s dedicated to tracking cupcake stock updates. Step 2: Integrate Pusher for Real-Time Updates Register the application: After creating an app within Pusher, copy the application credentials, including the app_id, key, secret, and cluster information, and store them for use in your application. Step 3: Deploy With Vercel Integrate with GitHub: Push the updated codebase to a GitHub repository, ensuring your changes are version-controlled and ready for deployment. Import and set up the project in Vercel: Navigate to Vercel and import the project by selecting the “cupcakefanatic” repository. Specify cupcake-pusher as the root directory for the deployment. Configure the environment: Enter the project-specific environment variables. With that, I can establish a seamless real-time connection between the server and clients, enhancing the store’s online presence and user engagement — without the heavy lifting traditionally associated with real-time streaming data. Below is a screenshot of the resulting real-time data in our cupcake app. With the winning combination of Redpanda Serverless, Pusher, and Vercel, I easily created a dynamic, responsive application that keeps customers informed and engaged with live inventory updates. If you have questions, ask me in the Redpanda Community on Slack, I am there most of the time :)
Businesses can react quickly and effectively to user behavior patterns by using real-time analytics. This allows them to take advantage of opportunities that might otherwise pass them by and prevent problems from getting worse. Apache Kafka, a popular event streaming platform, can be used for real-time ingestion of data/events generated from various sources across multiple verticals such as IoT, financial transactions, inventory, etc. This data can then be streamed into multiple downstream applications or engines for further processing and eventual analysis to support decision-making. Apache Flink serves as a powerful engine for refining or enhancing streaming data by modifying, enriching, or restructuring it upon arrival at the Kafka topic. In essence, Flink acts as a downstream application that continuously consumes data streams from Kafka topics for processing, and then ingests the processed data into various Kafka topics. Eventually, Apache Druid can be integrated to consume the processed streaming data from Kafka topics for analysis, querying, and making instantaneous business decisions. Click here for an enlarged view In my previous write-up, I explained how to integrate Flink 1.18 with Kafka 3.7.0. In this article, I will outline the steps to transfer processed data from Flink 1.18.1 to a Kafka 2.13-3.7.0 topic. A separate article detailing the ingestion of streaming data from Kafka topics into Apache Druid for analysis and querying was published a few months ago. You can read it here. Execution Environment We configured a multi-node cluster (three nodes) where each node has a minimum of 8 GB RAM and 250 GB SSD along with Ubuntu-22.04.2 amd64 as the operating system. OpenJDK 11 is installed with JAVA_HOME environment variable configuration on each node. Python 3 or Python 2 along with Perl 5 is available on each node. A three-node Apache Kafka-3.7.0 cluster has been up and running with Apache Zookeeper -3.5.6. on two nodes. Apache Druid 29.0.0 has been installed and configured on a node in the cluster where Zookeeper has not been installed for the Kafka broker. Zookeeper has been installed and configured on the other two nodes. The Leader broker is up and running on the node where Druid is running. Developed a simulator using the Datafaker library to produce real-time fake financial transactional JSON records every 10 seconds of interval and publish them to the created Kafka topic. Here is the JSON data feed generated by the simulator. JSON {"timestamp":"2024-03-14T04:31:09Z ","upiID":"9972342663@ybl","name":"Kiran Marar","note":" ","amount":"14582.00","currency":"INR","geoLocation":"Latitude: 54.1841745 Longitude: 13.1060775","deviceOS":"IOS","targetApp":"PhonePe","merchantTransactionId":"ebd03de9176201455419cce11bbfed157a","merchantUserId":"65107454076524@ybl"} Extract the archive of the Apache Flink-1.18.1-bin-scala_2.12.tgz on the node where Druid and the leader broker of Kafka are not running Running a Streaming Job in Flink We will dig into the process of extracting data from a Kafka topic where incoming messages are being published from the simulator, performing processing tasks on it, and then reintegrating the processed data back into a different topic of the multi-node Kafka cluster. We developed a Java program (StreamingToFlinkJob.java) that was submitted as a job to Flink to perform the above-mentioned steps, considering a window of 2 minutes and calculating the average amount transacted from the same mobile number (upi id) on the simulated UPI transactional data stream. The following list of jar files has been included on the project build or classpath. Using the code below, we can get the Flink execution environment inside the developed Java class. Java Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); Now we should read the messages/stream that has already been published by the simulator to the Kafka topic inside the Java program. Here is the code block. Java KafkaSource kafkaSource = KafkaSource.<UPITransaction>builder() .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS)// IP Address with port 9092 where leader broker is running in cluster .setTopics(IKafkaConstants.INPUT_UPITransaction_TOPIC_NAME) .setGroupId("upigroup") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new KafkaUPISchema()) .build(); To retrieve information from Kafka, setting up a deserialization schema within Flink is crucial for processing events in JSON format, converting raw data into a structured form. Importantly, setParallelism needs to be set to no.of Kafka topic partitions else the watermark won't work for the source, and data is not released to the sink. Java DataStream<UPITransaction> stream = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(2)), "Kafka Source").setParallelism(1); With successful event retrieval from Kafka, we can enhance the streaming job by incorporating processing steps. The subsequent code snippet reads Kafka data, organizes it by mobile number (upiID), and computes the average price per mobile number. To accomplish this, we developed a custom window function for calculating the average and implemented watermarking to handle event time semantics effectively. Here is the code snippet: Java SerializableTimestampAssigner<UPITransaction> sz = new SerializableTimestampAssigner<UPITransaction>() { @Override public long extractTimestamp(UPITransaction transaction, long l) { try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); Date date = sdf.parse(transaction.eventTime); return date.getTime(); } catch (Exception e) { return 0; } } }; WatermarkStrategy<UPITransaction> watermarkStrategy = WatermarkStrategy.<UPITransaction>forBoundedOutOfOrderness(Duration.ofMillis(100)).withTimestampAssigner(sz); DataStream<UPITransaction> watermarkDataStream = stream.assignTimestampsAndWatermarks(watermarkStrategy); //Instead of event time, we can use window based on processing time. Using TumblingProcessingTimeWindows DataStream<TransactionAgg> groupedData = watermarkDataStream.keyBy("upiId").window(TumblingEventTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500))).sum("amount"); .apply(new TransactionAgg()); Eventually, the processing logic (computation of average price for the same UPI ID based on a mobile number for the window of 2 minutes on the continuous flow of transaction stream) is executed inside Flink. Here is the code block for the Window function to calculate the average amount on each UPI ID or mobile number. Java public class TransactionAgg implements WindowFunction<UPITransaction, TransactionAgg, Tuple, TimeWindow> { @Override public void apply(Tuple key, TimeWindow window, Iterable<UPITransaction> values, Collector<TransactionAgg> out) { Integer sum = 0; //Consider whole number int count = 0; String upiID = null ; for (UPITransaction value : values) { sum += value.amount; upiID = value.upiID; count++; } TransactionAgg output = new TransactionAgg(); output.upiID = upiID; output.eventTime = window.getEnd(); output.avgAmount = (sum / count); out.collect( output); } } We have processed the data. The next step is to serialize the object and send it to a different Kafka topic. Add a KafkaSink in the developed Java code (StreamingToFlinkJob.java) to send the processed data from the Flink engine to a different Kafka topic created on the multi-node Kafka cluster. Here is the code snippet to serialize the object before sending/publishing it to the Kafka topic: Java public class KafkaTrasactionSinkSchema implements KafkaRecordSerializationSchema<TransactionAgg> { @Override public ProducerRecord<byte[], byte[]> serialize( TransactionAgg aggTransaction, KafkaSinkContext context, Long timestamp) { try { return new ProducerRecord<>( topic, null, // not specified partition so setting null aggTransaction.eventTime, aggTransaction.upiID.getBytes(), objectMapper.writeValueAsBytes(aggTransaction)); } catch (Exception e) { throw new IllegalArgumentException( "Exception on serialize record: " + aggTransaction, e); } } } And, below is the code block to sink the processed data sending back to a different Kafka topic. Java KafkaSink<TransactionAgg> sink = KafkaSink.<TransactionAgg>builder() .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS) .setRecordSerializer(new KafkaTrasactionSinkSchema(IKafkaConstants.OUTPUT_UPITRANSACTION_TOPIC_NAME)) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); groupedData.sinkTo(sink); // DataStream that created above for TransactionAgg env.execute(); Connecting Druid With Kafka Topic In this final step, we need to integrate Druid with the Kafka topic to consume the processed data stream that is continuously published by Flink. With Apache Druid, we can directly connect Apache Kafka so that real-time data can be ingested continuously and subsequently queried to make business decisions on the spot without interventing any third-party system or application. Another beauty of Apache Druid is that we need not configure or install any third-party UI application to view the data that landed or is published to the Kafka topic. To condense this article, I omitted the steps for integrating Druid with Apache Kafka. However, a few months ago, I published an article on this topic (linked earlier in this article). You can read it and follow the same approach. Final Note The provided code snippet above is for understanding purposes only. It illustrates the sequential steps of obtaining messages/data streams from a Kafka topic, processing the consumed data, and eventually sending/pushing the modified data into a different Kafka topic. This allows Druid to pick up the modified data stream for query, analysis as a final step. Later, we will upload the entire codebase on GitHub if you are interested in executing it on your own infrastructure. I hope you enjoyed reading this. If you found this article valuable, please consider liking and sharing it.
Introductory note: This article has been co-authored by Federico Trotta and Karin Wolok. Introduction Stream processing is a distributed computing paradigm that supports the gathering, processing, and analysis of high-volume and continuous data streams to extract insights in real time. As we’re living in a world where more and more data "is born" as streams, allowing analysts to extract insights in real-time for faster business decisions, we wanted to propose a gentle introduction to this topic. Table of Contents Defining and understanding stream processing Computing data in stream processing Transformations in stream processing Stream processing use cases How to deal with stream processing: introducing streaming databases and systems Conclusions Defining and Understanding Stream Processing Stream processing is a programming paradigm that views streams — also called “sequences of events in time” — as the central input and output objects of computation: Stream processing. Image by Federico Trotta Unlike traditional batch processing — which handles data in chunks — stream processing allows us to work with data as a constant flow, making it ideal for scenarios where immediacy and responsiveness are fundamental: Batch processing. Image by Federico Trotta. So, stream processing means computing and processing data in motion and real time. This is important in today's world because the majority of data today "is born" as continuous streams: A stream of data. Image by Federico Trotta Let’s think about cases like, sensor events, user activity on a website, or financial trades. Sensors, for example, continuously collect data from the environment in which they are installed in real time. This provides applications for weather monitoring, industrial automation, IoT devices, and many more. Also, if we think of websites, we can say that they are platforms where users dynamically engage in various activities such as clicking, scrolling, typing, and more. These interactions, which are continuously generated, are captured in real-time to analyze user behavior, improve user experience, and make timely adjustments, just to mention a few activities. In financial markets, today transactions occur rapidly and continuously. Stock prices, currency values, and other financial metrics are constantly changing, and require real-time data processing to make rapid decisions. So, the objective of stream processing is to quickly analyze, filter, transform, or enhance data in real time. Once processed, the data is passed off to an application, data store, or another stream processing engine. Computing Data in Stream Processing Given the nature of stream processes, data are processed in different ways with respect to batch processing. Let’s discuss how. Incremental Computation Incremental computation is a technique used to optimize computational processes, allowing them to process only the parts of the data that have changed since a previous computation, rather than recomputing the entire result from scratch. This technique is particularly useful for saving computational resources. Incremental computation. Image by Federico Trotta. In the context of stream processing, incremental computation involves updating results continuously as new data arrives, rather than recalculating everything from scratch each time. To give an example, let's say we have a stream of sensor data from IoT devices measuring temperature in a factory. We want to calculate the average temperature over a sliding window of time, such as the average temperature over the last 5 minutes, and we want to update this average in real-time as new sensor data arrives. Instead of recalculating the entire average every time new data arrives, which could be computationally expensive especially as the dataset grows, we can use incremental computation to update the average efficiently. Transformations in Stream Processing In the context of stream processing, we can perform different sets of transformations to an incoming data stream. Some of them are typical transformations we can always make on data, while others are specific for the “real-time case.” Among the others, we can mention the following methodologies: Filtering: This is the act of removing unwanted data from the whole data stream, based on specified conditions. Data filtering. Image by Federico Trotta. Aggregation: This involves summing, averaging, counting, or finding any other statistical measure to gain insight from a given stream of data, over specific intervals. Data aggregation. Image by Federico Trotta. Enrichment: It’s the act of enhancing the incoming data by adding additional information from external sources. This process can provide more context to the data, making it more valuable for downstream applications. Data enrichment. Image by Federico Trotta. Transformation: Data transformation is a general way to modify data formats. With this methodology, we apply various transformations to the data, such as converting data formats, normalizing values, or extracting specific fields, ensuring that the data is in the desired format for further analysis or integration. Data transformation. Image by Federico Trotta. Windowing: It’s the act of dividing the continuous stream into discrete windows of data. This allows us to analyze data within specific time frames, enabling the detection of trends and patterns over different intervals. Windowing. Image by Federico Trotta. Load, balancing, and scaling: It’s a way to distribute the processing load across multiple nodes to achieve scalability. Stream processing frameworks, in fact, often support parallelization and distributed computing to handle large volumes of data efficiently. The process of load, balancing, and scaling. Image by Federico Trotta. Stateful stream processing. It’s a type of transformation that involves processing a continuous stream of data in real time while also maintaining the current state of the data that has been processed. This allows the system to process each event as received while also tracking changes in the data stream over time, by including the history and the context of the data. Stateful processing. Image by Federico Trotta. Pattern recognition: As understandable, this is a kind of transformation that searches for a set of event patterns. It is particularly interesting in the case of data streams as the data are in a continuous flow because it can also intercept anomalies in the flow. Pattern recognition by Federico Trotta. Stream Processing Use Cases To gain a deeper understanding of stream processing, let’s consider some real-world use cases. Use Case 1: Real-Time Fraud Detection in Financial Transactions Due to the increased online transactions, fraudsters are becoming increasingly sophisticated, and detecting fraudulent activities in real-time is crucial to prevent financial losses and protect customers. In this case, stream processing is applied to analyze incoming transactions, looking for patterns or anomalies that might indicate fraudulent behaviors. For example, if someone clones your credit card and buys something from a location on the other side of the world from where you reside, the system recognizes a fraudulent transaction thanks to stream processing (and Machine Learning). Use Case 2: IoT Data Analytics for Predictive Maintenance In today’s interconnected world, devices and sensors generate vast amounts of data. For industries like manufacturing, predicting equipment failures and performing proactive maintenance are critical to minimize downtime and reduce operational costs. In this case, stream processing is employed to analyze data streams from IoT sensors, applying algorithms for anomaly detection, trend analysis, and pattern recognition. This helps companies with the early identification of potential equipment failures or deviations from normal operating conditions. Use Case 3: Anomaly Detection and Handling in Trading Systems Trading systems generate a constant stream of financial data, including stock prices, trading volumes, and other market indicators. Since anomaly detection is a crucial aspect of trading systems, helping to identify unusual patterns or behaviors in financial data that may indicate potential issues, errors, or fraudulent activities, sophisticated algorithms are employed to analyze the data stream in real time and identify patterns that deviate from normal market behavior. Use Case 4: Advertising Analytics In advertising analytics, the use of data streams is fundamental in gathering, processing, and analyzing vast amounts of real-time data to optimize ad campaigns, understand user behavior, and measure the effectiveness of advertising efforts. Data streams allow advertisers to monitor ad campaigns in real time, and metrics such as impressions, clicks, conversions, and engagement can be continuously tracked, providing immediate insights into the performance of advertising assets. How To Deal with Stream Processing: Introducing Streaming Databases and Systems As stream processing involves dealing with data in real time, they have to be managed differently from how data in batches are. What we mean is that the “classical” database is no longer sufficient. What’s really needed is “an ecosystem”: not just a database that can manage data “on the fly.” In this section, we introduce some topics and concepts that will be deepened in upcoming, deep articles, on streaming systems and databases. Ksqldb Apache Kafka “is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.” Kafka, in particular, is an ecosystem that provides a streaming database called ‘ksqldb,’ but also tools and integrations to help data engineers implement a stream processing architecture to existing data sources. Apache Flink Apache Flink “is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, and perform computations at in-memory speed and at any scale.” In particular, Apache Flink is a powerful stream processing framework suitable for real-time analytics and complex event processing, while Kafka is a distributed streaming platform primarily used for building real-time data pipelines. RisingWave RisingWave “is a distributed SQL streaming database that enables simple, efficient, and reliable processing of streaming data.” RisingWave reduces the complexity of building stream-processing applications by allowing developers to express intricate stream-processing logic through cascaded materialized views. Furthermore, it allows users to persist data directly within the system, eliminating the need to deliver results to external databases for storage and query serving. In particular, RisingWave can gather data in real-time from various applications, sensors and devices, social media apps, websites, and more. Conclusions As a gentle introduction to stream processing, in this article, we’ve defined what stream processing is, how it differs from batch processing, and how computation works in stream processing. We've also reported some use cases to show how the theory of stream processes applies to real-world examples like manufacturing and finance. Finally, we introduced some solutions on how to implement stream processing. In upcoming articles, we're describing what are stream processing systems and streaming databases and how to pick the one that suits your business needs.
For years, JDBC and ODBC have been commonly adopted norms for database interaction. Now, as we gaze upon the vast expanse of the data realm, the rise of data science and data lake analytics brings bigger and bigger datasets. Correspondingly, we need faster and faster data reading and transmission, so we start to look for better answers than JDBC and ODBC. Thus, we include the Arrow Flight SQL protocol in Apache Doris 2.1, which provides tens-fold speedups for data transfer. High-Speed Data Transfer Based on Arrow Flight SQL As a column-oriented data warehouse, Apache Doris arranges its query results in the form of data Blocks in a columnar format. Before version 2.1, the Blocks must be serialized into bytes in row-oriented formats before they can be transferred to a target client via a MySQL client or JDBC/ODBC driver. Moreover, if the target client is a columnar database or a column-oriented data science component like Pandas, the data should then be de-serialized. The serialization-deserialization process is a speed bump for data transmission. Apache Doris 2.1 has a data transmission channel built on Arrow Flight SQL. (Apache Arrow is a software development platform designed for high data movement efficiency across systems and languages, and the Arrow format aims for high-performance, lossless data exchange.) It allows high-speed, large-scale data reading from Doris via SQL in various mainstream programming languages. For target clients that also support the Arrow format, the whole process will be free of serialization/deserialization, thus no performance loss. Another upside is, that Arrow Flight can make full use of multi-node and multi-core architecture and implement parallel data transfer, which is another enabler of high data throughput. For example, if a Python client reads data from Apache Doris, Doris will first convert the column-oriented Blocks to Arrow RecordBatch. Then in the Python client, Arrow RecordBatch will be converted to Pandas DataFrame. Both conversions are fast because the Doris Blocks, Arrow RecordBatch, and Pandas DataFrame are all column-oriented. In addition, Arrow Flight SQL provides a general JDBC driver to facilitate seamless communication between databases that support the Arrow Flight SQL protocol. This unlocks the potential of Doris to be connected to a wider ecosystem and to be used in more cases. Performance Test The "tens-fold speedups" conclusion is based on our benchmark tests. We tried reading data from Doris using PyMySQL, Pandas, and Arrow Flight SQL, and jotted down the durations, respectively. The test data is the ClickBench dataset. Results on various data types are as follows: As shown, Arrow Flight SQL outperforms PyMySQL and Pandas in all data types by a factor ranging from 20 to several hundred. Usage With support for Arrow Flight SQL, Apache Doris can leverage the Python ADBC Driver for fast data reading. I will showcase a few frequently executed database operations using the Python ADBC Driver (version 3.9 or later), including DDL, DML, session variable setting, and show statements. 1. Install Library The relevant library is already published on PyPI. It can be installed simply as follows: C++ pip install adbc_driver_manager pip install adbc_driver_flightsql Import the following module/library to interact with the installed library: Python import adbc_driver_manager import adbc_driver_flightsql.dbapi as flight_sql 2. Connect to Doris Create a client for interacting with the Doris Arrow Flight SQL service. Prerequisites include Doris frontend (FE) host, Arrow Flight port, and login username/password. Configure parameters for Doris frontend (FE) and backend (BE): In fe/conf/fe.conf, set arrow_flight_sql_port to an available port, such as 9090. In be/conf/be.conf, set arrow_flight_port to an available port, such as 9091. Suppose that the Arrow Flight SQL services for the Doris instance will run on ports 9090 and 9091 for FE and BE respectively, and the Doris username/password is "user" and "pass", the connection process would be: C++ conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "user", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "pass", }) cursor = conn.cursor() Once the connection is established, you can interact with Doris using SQL statements through the returned cursor object. This allows you to perform various operations such as table creation, metadata retrieval, data import, and query execution. 3. Create a Table and Retrieve MetadataPass the query to the cursor.execute() function, which creates tables and retrieves metadata. C++ cursor.execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("create database arrow_flight_sql;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("show databases;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("use arrow_flight_sql;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("""CREATE TABLE arrow_flight_sql_test ( k0 INT, k1 DOUBLE, K2 varchar(32) NULL DEFAULT "" COMMENT "", k3 DECIMAL(27,9) DEFAULT "0", k4 BIGINT NULL DEFAULT '10', k5 DATE, ) DISTRIBUTED BY HASH(k5) BUCKETS 5 PROPERTIES("replication_num" = "1");""") print(cursor.fetchallarrow().to_pandas()) cursor.execute("show create table arrow_flight_sql_test;") print(cursor.fetchallarrow().to_pandas()) If the returned StatusResult is 0, which means the query is executed successfully. (Such design is to ensure compatibility with JDBC.) C++ StatusResult 0 0 StatusResult 0 0 Database 0 __internal_schema 1 arrow_flight_sql .. ... 507 udf_auth_db [508 rows x 1 columns] StatusResult 0 0 StatusResult 0 0 Table Create Table 0 arrow_flight_sql_test CREATE TABLE `arrow_flight_sql_test` (\n `k0`... 4. Ingest DataExecute an INSERT INTO statement to load test data into the table created: C++ cursor.execute("""INSERT INTO arrow_flight_sql_test VALUES ('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'), ('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'), ('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'), ('3', 4, "ID", 4, 4, '2023-10-22'), ('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""") print(cursor.fetchallarrow().to_pandas()) If you see the following returned result, the data ingestion is successful. C++ StatusResult 0 0 If the data size to ingest is huge, you can apply the Stream Load method using pydoris. 5. Execute QueriesPerform queries on the above table, such as aggregation, sorting, and session variable setting. C++ cursor.execute("select * from arrow_flight_sql_test order by k0;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("set exec_mem_limit=2000;") print(cursor.fetchallarrow().to_pandas()) cursor.execute("show variables like \"%exec_mem_limit%\";") print(cursor.fetchallarrow().to_pandas()) cursor.execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;") print(cursor.fetchallarrow().to_pandas()) The results are as follows: C++ k0 k1 K2 k3 k4 k5 0 0 0.10000 ID 0.000100000 9999999999 2023-10-21 1 1 0.20000 ID_1 1.000000010 0 2023-10-21 2 2 3.40000 ID_1 3.100000000 123456 2023-10-22 3 3 4.00000 ID 4.000000000 4 2023-10-22 4 4 122345.54321 ID 122345.543210000 5 2023-10-22 [5 rows x 6 columns] StatusResult 0 0 Variable_name Value Default_Value Changed 0 exec_mem_limit 2000 2147483648 1 k5 Nullable(Float64)_1 Int64_2 Nullable(Decimal(38, 9))_3 0 2023-10-22 122352.94321 3 40784.214403333 1 2023-10-21 0.30000 2 0.500050005 [2 rows x 5 columns] 6. Complete Code C++ # Doris Arrow Flight SQL Test # step 1, library is released on PyPI and can be easily installed. # pip install adbc_driver_manager # pip install adbc_driver_flightsql import adbc_driver_manager import adbc_driver_flightsql.dbapi as flight_sql # step 2, create a client that interacts with the Doris Arrow Flight SQL service. # Modify arrow_flight_sql_port in fe/conf/fe.conf to an available port, such as 9090. # Modify arrow_flight_port in be/conf/be.conf to an available port, such as 9091. conn = flight_sql.connect(uri="grpc://127.0.0.1:9090", db_kwargs={ adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", }) cursor = conn.cursor() # interacting with Doris via SQL using Cursor def execute(sql): print("\n### execute query: ###\n " + sql) cursor.execute(sql) print("### result: ###") print(cursor.fetchallarrow().to_pandas()) # step3, execute DDL statements, create database/table, show stmt. execute("DROP DATABASE IF EXISTS arrow_flight_sql FORCE;") execute("show databases;") execute("create database arrow_flight_sql;") execute("show databases;") execute("use arrow_flight_sql;") execute("""CREATE TABLE arrow_flight_sql_test ( k0 INT, k1 DOUBLE, K2 varchar(32) NULL DEFAULT "" COMMENT "", k3 DECIMAL(27,9) DEFAULT "0", k4 BIGINT NULL DEFAULT '10', k5 DATE, ) DISTRIBUTED BY HASH(k5) BUCKETS 5 PROPERTIES("replication_num" = "1");""") execute("show create table arrow_flight_sql_test;") # step4, insert into execute("""INSERT INTO arrow_flight_sql_test VALUES ('0', 0.1, "ID", 0.0001, 9999999999, '2023-10-21'), ('1', 0.20, "ID_1", 1.00000001, 0, '2023-10-21'), ('2', 3.4, "ID_1", 3.1, 123456, '2023-10-22'), ('3', 4, "ID", 4, 4, '2023-10-22'), ('4', 122345.54321, "ID", 122345.54321, 5, '2023-10-22');""") # step5, execute queries, aggregation, sort, set session variable execute("select * from arrow_flight_sql_test order by k0;") execute("set exec_mem_limit=2000;") execute("show variables like \"%exec_mem_limit%\";") execute("select k5, sum(k1), count(1), avg(k3) from arrow_flight_sql_test group by k5;") # step6, close cursor cursor.close() Examples of Data Transmission at Scale 1. PythonIn Python, after connecting to Doris using the ADBC Driver, you can use various ADBC APIs to load the Clickbench dataset from Doris into Python. Here's how: Python #!/usr/bin/env python # -*- coding: utf-8 -*- import adbc_driver_manager import adbc_driver_flightsql.dbapi as flight_sql import pandas from datetime import datetime my_uri = "grpc://0.0.0.0:`fe.conf_arrow_flight_port`" my_db_kwargs = { adbc_driver_manager.DatabaseOptions.USERNAME.value: "root", adbc_driver_manager.DatabaseOptions.PASSWORD.value: "", } sql = "select * from clickbench.hits limit 1000000;" # PEP 249 (DB-API 2.0) API wrapper for the ADBC Driver Manager. def dbapi_adbc_execute_fetchallarrow(): conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs) cursor = conn.cursor() start_time = datetime.now() cursor.execute(sql) arrow_data = cursor.fetchallarrow() dataframe = arrow_data.to_pandas() print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", cost:" + str(datetime.now() - start_time) + ", bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data))) print(dataframe.info(memory_usage='deep')) print(dataframe) # ADBC reads data into pandas dataframe, which is faster than fetchallarrow first and then to_pandas. def dbapi_adbc_execute_fetch_df(): conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs) cursor = conn.cursor() start_time = datetime.now() cursor.execute(sql) dataframe = cursor.fetch_df() print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + str(datetime.now() - start_time)) print(dataframe.info(memory_usage='deep')) print(dataframe) # Can read multiple partitions in parallel. def dbapi_adbc_execute_partitions(): conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs) cursor = conn.cursor() start_time = datetime.now() partitions, schema = cursor.adbc_execute_partitions(sql) cursor.adbc_read_partition(partitions[0]) arrow_data = cursor.fetchallarrow() dataframe = arrow_data.to_pandas() print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + ", len(partitions):" + str(len(partitions))) print(dataframe.info(memory_usage='deep')) print(dataframe) dbapi_adbc_execute_fetchallarrow() dbapi_adbc_execute_fetch_df() dbapi_adbc_execute_partitions() The results are as follows (omitting the repeated outputs). It only takes 3s to load a Clickbench dataset containing 1 million rows and 105 columns. Python ################## dbapi_adbc_execute_fetchallarrow, cost:0:00:03.548080, bytes:784372793, len(arrow_data):1000000 <class 'pandas.core.frame.DataFrame'> RangeIndex: 1000000 entries, 0 to 999999 Columns: 105 entries, CounterID to CLID dtypes: int16(48), int32(19), int64(6), object(32) memory usage: 2.4 GB None CounterID EventDate UserID EventTime WatchID JavaEnable Title GoodEvent ... UTMCampaign UTMContent UTMTerm FromTag HasGCLID RefererHash URLHash CLID 0 245620 2013-07-09 2178958239546411410 2013-07-09 19:30:27 8302242799508478680 1 OWAProfessionov — Мой Круг (СВАО Интернет-магазин 1 ... 0 -7861356476484644683 -2933046165847566158 0 999999 1095 2013-07-03 4224919145474070397 2013-07-03 14:36:17 6301487284302774604 0 @дневники Sinatra (ЛАДА, цена для деталли кто ... 1 ... 0 -296158784638538920 1335027772388499430 0 [1000000 rows x 105 columns] ################## dbapi_adbc_execute_fetch_df, cost:0:00:03.611664 ################## dbapi_adbc_execute_partitions, cost:0:00:03.483436, len(partitions):1 ################## low_level_api_execute_query, cost:0:00:03.523598, stream.address:139992182177600, rows:-1, bytes:784322926, len(arrow_data):1000000 ################## low_level_api_execute_partitions, cost:0:00:03.738128streams.size:3, 1, -1 2. JDBC Usage of this driver is similar to using that for the MySQL protocol. You just need to replace jdbc:mysql in the connection URL with jdbc:arrow-flight-sql. The returned result will be in the JDBC ResultSet data structure. Java import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver"); String DB_URL = "jdbc:arrow-flight-sql://0.0.0.0:9090?useServerPrepStmts=false" + "&cachePrepStmts=true&useSSL=false&useEncryption=false"; String USER = "root"; String PASS = ""; Connection conn = DriverManager.getConnection(DB_URL, USER, PASS); Statement stmt = conn.createStatement(); ResultSet resultSet = stmt.executeQuery("show tables;"); while (resultSet.next()) { String col1 = resultSet.getString(1); System.out.println(col1); } resultSet.close(); stmt.close(); conn.close(); 3. JAVASimilar to that with Python, you can directly create an ADBC client with JAVA to read data from Doris. Firstly, you need to obtain the FlightInfo. Then, you connect to each endpoint to pull the data. Java // method one AdbcStatement stmt = connection.createStatement() stmt.setSqlQuery("SELECT * FROM " + tableName) // executeQuery, two steps: // 1. Execute Query and get returned FlightInfo; // 2. Create FlightInfoReader to sequentially traverse each Endpoint; QueryResult queryResult = stmt.executeQuery() // method two AdbcStatement stmt = connection.createStatement() stmt.setSqlQuery("SELECT * FROM " + tableName) // Execute Query and parse each Endpoint in FlightInfo, and use the Location and Ticket to construct a PartitionDescriptor partitionResult = stmt.executePartitioned(); partitionResult.getPartitionDescriptors() //Create ArrowReader for each PartitionDescriptor to read data ArrowReader reader = connection2.readPartition(partitionResult.getPartitionDescriptors().get(0).getDescriptor())) 4. SparkFor Spark users, apart from connecting to Flight SQL Server using JDBC and JAVA, you can apply the Spark-Flight-Connector, which enables Spark to act as a client for reading and writing data from/to a Flight SQL Server. This is made possible by the fast data conversion between the Arrow format and the Block in Apache Doris, which is 10 times faster than the conversion between CSV and Block. Moreover, the Arrow data format provides more comprehensive and robust support for complex data types such as Map and Array. Hop on the Trend TrainA number of enterprise users of Doris have tried loading data from Doris to Python, Spark, and Flink using Arrow Flight SQL and enjoyed much faster data reading speed. In the future, we plan to include support for Arrow Flight SQL in data writing, too. By then, most systems built with mainstream programming languages will be able to read and write data from/to Apache Doris by an ADBC client. That's high-speed data interaction which opens up numerous possibilities. On our to-do list, we also envision leveraging Arrow Flight to implement parallel data reading by multiple backends and facilitate federated queries across Doris and Spark. Download Apache Doris 2.1 and get a taste of 100 times faster data transfer powered by Arrow Flight SQL. If you need assistance, come find us in the Apache Doris developer and user community.
What Is a Message Broker? A message broker is an important component of asynchronous distributed systems. It acts as a bridge in the producer-consumer pattern. Producers write messages to the broker, and the consumer reads the message from the broker. The broker handles queuing, routing, and delivery of messages. The diagram below shows how the broker is used in the producer-consumer pattern: This article discusses the popular brokers used today and when to use them. Simple Queue Service (SQS) Simple Queue Service (SQS) is offered by Amazon Web Services (AWS) as a managed message queue service. AWS fully manages the queue, making SQS an easy solution for passing messages between different components of software running on AWS infrastructure. The section below details what is supported and what is not in SQS Supported Pay for what you use: SQS only charges for the messages read and written to the queue. There is no recurring or base charge for using SQS. Ease of setup: SQS is a fully managed AWS service, no infrastructure setup is required for using SQS. Reading and writing are also simple either using direct REST APIs provided by SQS or using AWS lambda functions. Support for FIFO queues: Besides regular standard queues, SQS also supports FIFO queues. For applications that need strict ordering of messages, FIFO queues come in handy. Scale: SQS scales elastically with the application, no need to worry about capacity and pre-provisioning. There is no limit to the number of messages per queue, and queues offer nearly unlimited output. Queue for failed messages/dead-letter queue: All the messages that can't be processed are sent to a "dead-letter" queue. SQS takes care of moving messages automatically into the dead-letter queue based on the retry configuration of the main queue. Not Supported Lack of message broadcast: SQS doesn't have a way for multiple consumers to retrieve the same message with its "exactly once" transmission. For multiple consumer use cases, SQS needs to be used along with AWS SNS, which needs multiple queues subscribed to the same SNS topic. Replay: SQS doesn't have the ability to replay old messages. Replay is sometimes required for debugging and testing. Kinesis Kinesis is another offering of AWS. Kinesis streams enable large-scale data ingestion and real-time processing of streaming data. Like SQS, Kinesis is also a fully managed service. Below are details of what is supported and what is not in Kinesis. Supported Ease of setup: Kinesis like SQS is a fully managed AWS service, no infrastructure setup is required. Message broadcast: Kinesis allows multiple consumers to read the same message from the stream concurrently. AWS integration: Kinesis integrates seamlessly with other AWS services as part of the other AWS services. Replay: Kinesis allows the replay of messages as long as seven days in the past, and provides the ability for a client to consume messages at a later time. Real-time analytics: Provides support for ingestion, processing, and analysis of large data streams in real-time. Not Supported Strict message ordering: Kinesis supports in-order processing within a shard, however, provides no guarantee on ordering between shards. Lack of dead-letter queue: There is no support for dead dead-letter queue out of the box. Every application that consumes the stream has to deal with failure on its own. Auto-scaling: Kinesis streams don't scale dynamically in response to demand. Streams need to be provisioned ahead of time to meet the anticipated demand of both producer and consumer. Cost: For a large volume of data, pricing can be really high in comparison to other brokers. Kafka Kafka is a distributed event store and stream-processing platform. It is an open-source system developed by the Apache Software Foundation. Apache is famous for its high throughput and scalability. It excels in real-time analytics and monitoring. Below are details of what is supported and what is not in Kafka. Supported Message broadcast: Kafka allows multiple consumers to read the same message from the stream. Replay: Kafka allows messages to be replayed from a specific point in a topic. Message retention policy decides how far back a message can be replayed. Unlimited message retention: Kafka allows unlimited message retention based on the retention policy configured. Real-time analytics: Provides support for ingestion, processing, and analysis of large data streams in real-time. Open source: Kafka is an open project, which resulted in widespread adoption and community support. It has lots of configuration options available which gives the opportunity to fine-tune based on the specific use case. Not Supported Automated setup: Since Kafka is an open source, the developer needs to set up the infrastructure and Kafka cluster setup. That said, most of the public cloud providers provide managed Kafka. Simple onboarding: For Kafka clusters that are not through managed services understanding the infrastructure can become a daunting task. Apache does provide lots of documentation, but it takes time for new developers to understand. Queue semantics: In the true sense, Kafka is a distributed immutable event log, not a queuing system. It does not inherently support distributing tasks to multiple workers so that each task is processed exactly once. Dynamic partition: It is difficult to dynamically change a number of Kafka topics. This limits the scalability of the system when workload increases. The large number of partitions needs to be pre-provisioned to support the maximum load. Pulsar Pulsar is an open-source, distributed messaging and streaming platform. It is an open-source system developed by the Apache Software Foundation. It provides highly scalable, flexible, and durable for real-time data streaming and processing. Below are details of what is supported and what is not in Pulsar. Supported Multi-tenancy: Pulsar supports multi-tenancy as a first-class citizen. It provides access control across data and actions using tenant policies. Seamless geo-replication: Pulsar synchronizes data across multiple regions without any third-party replication tools. Replay: Similar to Kafka, Pulsar allows messages to be replayed from a specific point in a topic. Message retention policy decides how far back a message can be replayed. Unlimited message retention: Similar to Kafka, Pulsar allows unlimited message retention based on the retention policy configured. Flexible models: Pulsar supports both streaming and queuing models. It provides strict message ordering within a partition. Not Supported Automated setup: Similar to Kafka, Apache is open-source, and the developer needs to set up the infrastructure. Robust ecosystem: Pulsar is relatively new compared to Kafka. It doesn't have large community support similar to Kafka. Out-of-the-box Integration: Pulsar lacks out-of-the-box integration and support compared to Kafka and SQS. Conclusion Managed services require minimal maintenance effort, but non-managed services need regular, dedicated maintenance capacity. On the flip side, non-managed services provide better flexibility and tuning opportunities than managed services. In the end, choosing the right broker depends on the project's needs. Understanding the strengths and gaps of each broker helps developers make informed decisions.
If you have followed any tutorial on data science in Python, chances are that you have heard about a library known as Pandas. Pandas provides a dataframe interface for your data, which is a two-dimensional tabular data structure that "shares many properties with matrices" (in the words of `R`'s definition of dataframes). Dataframes have grown over the years to be an essential component of the modern data scientist’s toolkit. In the words of Pandas creator Wes McKinney: (pandas) is what people use for data ingest, data prep, and feature engineering for machine learning models. The existence of other database systems that perform equivalent tasks isn't useful if they are not accessible to Python programmers with a convenient API. The latter part of McKinney's quote is (arguably) why Pandas and other similar models have been adopted by the data science community over more formally grounded models from the database community (e.g., relational tables). While it may be tempting to equate relational tables to dataframes, the two are fundamentally different. In Couchbase (and most other modern databases), data is conceptually represented as unordered (multi)-sets of records. The "set of records" abstraction gives databases more opportunities to execute queries more efficiently and over data LTM (larger than memory). On the other hand, (R and Pandas) dataframes are more akin to matrices in that the order of rows and columns matters. Data scientists have grown to expect support for operations like transpose (flipping the dataframe over its diagonal) and index-based access (e.g., df.iat[1, 2]), both of which are not as easy (or impossible) to perform with languages like SQL. I'll point to "Is a Dataframe Just a Table" by Yifan Wu for a deeper discussion of the two abstractions. Regardless of how dataframes have found their way into the mainstream, many Python data scientists simply prefer performing their EDA (exploratory data analysis) with Pandas. In this article, we'll explore how users more comfortable with dataframes can work with their Big Data in Couchbase's Analytics Service. We'll use the Yelp Dataset for our discussion, where we have the following collections: businesses, reviews, and checkins. The setup script can be found here. We will cover how to prepare a set of features with: the Couchbase Python SDK and Pandas (the de-facto Python dataframe standard) ; the Couchbase Python SDK and Modin (a scalable drop-in Pandas replacement) ; the Couchbase Python SDK and SQL++ ; and AFrame (an in-situ, no ETL, dataframe view for Couchbase collections). Couchbase and Pandas To start, let's assume that we want to predict whether a business is still "in business". More specifically, we want to predict the is_open flag of a business. For brevity, we omit the model training details in this article to focus on the feature engineering process. In the snippet below, we use the Couchbase Python SDK to access a local Couchbase cluster hosting an Analytics node. We use three simple SQL++ queries to load each dataset into three separate dataframes. Python from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions import pandas as pd auth = PasswordAuthenticator(username='admin', password='password') cluster = Cluster('couchbase://127.0.0.1', ClusterOptions(auth)) result_to_df = lambda r: pd.DataFrame(list(cluster.analytics_query(r))) businesses_df = result_to_df("FROM yelp.businesses b SELECT VALUE b") checkins_df = result_to_df("FROM yelp.checkins c SELECT VALUE c") # reviews_df = result_to_df("FROM yelp.reviews r SELECT VALUE r") Note that the last line is commented out — this is its own purpose! (We will handle the review collection later.) There are approximately 7 million total reviews in our Yelp dataset and loading all reviews into memory is more than my laptop can handle. In general, McKinney suggests Pandas users should have 5-10x more RAM than the size of their dataset. Herein lies the major roadblock data scientists with Big Data must work around: the inability to elegantly reason about data LTM. Our data resides in Couchbase, and to reason about our data in Pandas we need to copy our data from Couchbase into our Python process memory. Even if we had a workstation large enough to hold our data, we must now deal with the problem of staleness. If new records arrive in Couchbase after we build our dataframes, these new records will not be used for the rest of our analysis (or more optimistically, until we rebuild our dataframes). Having addressed the elephant in the room, let us now continue our feature engineering. The goal for each section (i.e., Couchbase and Pandas, Couchbase and Modin, Couchbase and SQL++, and Couchbase and AFrame) is to find some numeric and tabular representation of all businesses that we can subsequently use as input to an ML model. Suppose that we mess around and settle on the following numeric features: An n-hot encoding of the top 10 categories for a business. The total number of check-ins a business has. The total number of reviews a business has. The lowest star rating for a review about a given business. The highest star rating for a review about a given business. The average star rating for a review about a given business. The variance in star ratings for reviews about a given business. The kurtosis in star ratings for reviews about a given business. We will start with item 1 of the list above. To find the top 10 categories shared across all businesses, we 1) convert the comma-separated categories column into a column of lists, 2) "explode/unnest" the categories columns to generate a row per list entry, 3) count the distinct values for the categories column, and 4) extract the top 10 values into a set. Python top_categories = businesses_df['categories'] \ .str.split(', ') \ .explode() \ .value_counts() \ .nlargest(10) \ .keys() top_categories = set(top_categories) top_categories Plain Text {'Active Life', 'American (New)', 'American (Traditional)', 'Arts & Entertainment', 'Auto Repair', 'Automotive', 'Bakeries', 'Bars', 'Beauty & Spas', ... 'Specialty Food'} We are now ready to encode the existence of these top 10 categories as 1/0s. Using our business dataframe, we 1) convert the comma-separated categories column into a column of lists (again), 2) filter out all categories that are not in our previously found top 50 using a set intersection, 3) "join" the set of categories (row-wise) into a pipe-separated list, and 4) use the get_dummies() function to return a dataframe where each column corresponds to an item in step 3) list. Python ml_input_1 = businesses_df['categories']\ .str.split(', ')\ .apply(lambda a: None if a is None else '|'.join(set(a).intersection(top_categories)))\ .str.get_dummies() ml_input_1.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment ... 0 0 0 0 0 ... 1 0 0 0 0 ... 2 0 0 0 0 ... 3 0 0 0 1 ... 4 0 0 0 0 ... [5 rows x 10 columns] The next feature requires us to use the checkins_df dataframe. We 1) join our businesses_df dataframe with the checkins_df dataframe, 2) generate a list-valued column by splitting our comma-separated-string-valued field date, and 3) apply the len function to get the number of check-ins. For businesses that do not have a corresponding checkin row, we 4) set their value to 0. The (row-wise) order of ml_input_1 is identical to our result (i.e., they share the same "index"), so we can simply use the concat function along the horizontal axis (axis=1). Python result = businesses_df \ .join(checkins_df.set_index('business_id'), on='business_id') \ ['date'] \ .str.split(',') \ .map(lambda v: len(v) if type(v) is list else 0) \ .rename("Checkin Count") ml_input_2 = pd.concat([ml_input_1, result], axis=1) ml_input_2.head() Plain Text Active Life American (New) ... Skin Care Specialty Food Checkin Count 0 0 0 ... 0 0 146 1 0 0 ... 0 0 33 2 0 0 ... 0 0 19 3 0 0 ... 0 0 3 4 0 0 ... 0 0 0 [5 rows x 11 columns] Our last set of features revolves around the distribution of star ratings for a business. We define a function below called summarize which takes in a list of star ratings and returns various statistics using the scipy.stats.describe function. Python import scipy def summarize(all_stars): d = scipy.stats.describe(all_stars) return { 'Review Count': d.nobs, 'Minimum Stars': d.minmax[0], 'Maximum Stars': d.minmax[1], 'Mean Stars': d.mean, 'Star Variance': d.variance, 'Star Skewness': d.skewness, 'Star Kurtosis': d.kurtosis } As mentioned above, we cannot work with all reviews in memory. Instead, we will use the LIMIT and OFFSET clauses of SQL++ to work with chunks of 1 million reviews records. Note that this approach is not impervious to data changes at the Couchbase side. A more consistent (and performant) approach requires a duplicated reviews collection (i.e., a 3rd data copy of reviews) . In the snippet below, we end up with a dataframe (stars_df) that is row-wise aligned with the business_df dataframe and possesses a list-valued column called stars. Python import numpy as np stars_df = businesses_df[['business_id']] \ .assign(stars=[np.nan for _ in range(len(businesses_df))]) working_offset = 0 chunk_size = 1000000 while True: partial_df_1 = result_to_df(f""" FROM yelp.reviews r SELECT r.business_id, r.stars ORDER BY r.business_id ASC LIMIT {chunk_size} OFFSET {working_offset} """) if len(partial_df_1) == 0: break working_offset += chunk_size partial_df_2 = stars_df.loc[stars_df['stars'].notna()].explode('stars') partial_df_3 = pd.concat([partial_df_1, partial_df_2], axis=0) \ .groupby(['business_id']) \ .agg(list) stars_df = stars_df[['business_id']] \ .join(partial_df_3, on='business_id') Now possessing the minimum amount of review information for a given business, our final step is to apply the summarize function and concat this results in our existing feature set... Python partial_df_4 = pd.json_normalize(stars_df['stars'].apply(summarize)) ml_input_3 = pd.concat([ml_input_2, partial_df_4], axis=1) ml_input_3.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment \ 0 0 0 0 0 1 0 0 0 0 2 0 0 0 0 3 0 0 0 1 4 0 0 0 0 ... Skin Care Specialty Food Checkin Count Review Count Minimum Stars \ 0 ... 0 0 146 70 1.0 1 ... 0 0 33 5 1.0 2 ... 0 0 19 422 1.0 3 ... 0 0 3 8 3.0 4 ... 0 0 0 9 1.0 Maximum Stars Mean Stars Star Variance Star Skewness Star Kurtosis 0 5.0 3.342857 1.619876 -0.494108 -0.624766 1 5.0 2.800000 3.200000 0.035156 -1.581055 2 5.0 4.760664 0.747808 -3.799428 13.180355 3 5.0 4.750000 0.500000 -2.267787 3.142857 4 5.0 3.111111 4.111111 -0.158662 -1.910062 [5 rows x 18 columns] ...and now we are free to use the ml_input_3 dataframe directly as training input for some model in a machine learning library (e.g., scikit-learn)! For completeness, here's a snippet of using our feature set to train a decision tree classifier: Python import sklearn x, y = ml_input_3, businesses_df['is_open'] x_train, x_test, y_train, y_test = \ sklearn.model_selection.train_test_split(x, y, test_size=0.3, random_state=1) clf = sklearn.tree.DecisionTreeClassifier() clf = clf.fit(x_train,y_train) Couchbase and Modin The Pandas feature engineering process works best if we are able to fit our data into memory. If we are unable to fit our data into memory (e.g., the reviews dataset), we need to devise specialized solutions to consider our data in chunks. The next three sections delve into cleaner alternatives that do not require us (the data scientist) to consider how large our data is for in-core and out-of-core workloads. The first approach (and the easiest to integrate for existing Pandas users) is a library called Modin. Modin is a drop-in replacement for Pandas that translates Pandas operations into computations executed using a distributed runtime engine (i.e., Ray, Dask, or MPI/Unidist). We will start with three simple steps: 1) initializing our Ray backend, 2) replacing the import pandas as pd line in our first code snippet above with import modin.pandas as pd, and 3) uncommenting the line to load our reviews_df dataframe: Python import ray ray.init() from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions import modin.pandas as pd auth = PasswordAuthenticator(username='admin', password='password') cluster = Cluster('couchbase://127.0.0.1', ClusterOptions(auth)) result_to_df = lambda r: pd.DataFrame(list(cluster.analytics_query(r))) businesses_df = result_to_df("FROM yelp.businesses b SELECT VALUE b") checkins_df = result_to_df("FROM yelp.checkins c SELECT VALUE c") reviews_df = result_to_df("FROM yelp.reviews r SELECT VALUE r") The snippet above a) moves the data out of Couchbase into b) our Python process memory, and then our Python process memory to c) Modin's Ray backend. As we will see, the subsequent operations are more performant (thanks to Modin's task parallelism) at the cost of an even more expensive loading step. Furthermore, while out-of-core operations are possible in Modin, Modin delegates this spillage to the operating system (i.e., a workload-unaware component). The next two sections will address how we can simply operate on our data within Couchbase (in-situ) using Couchbase's built-in execution engine purposed for in-core and out-of-core distributed workloads. Having addressed the "new elephant in the second room," let us continue with our feature engineering. For all non-review-related features, our code remains nearly the same (there is a bug in the get_dummies() function for Modin, so we defer the computation of the ml_input_1 dataframe to Pandas). While we could keep our hand-crafted solution that chunks reviews (Modin is meant to be a drop-in replacement), we will showcase how Modin allows us to work out-of-core with large dataframes like reviews_df. In the snippet below, we 1) aggregate all review stars by their business_id values into list-valued columns, 2) apply our summarize function, 3) convert the application of our summarize function into 7 separate columns, 4) join this result (review_summary_df) with our original businesses_df dataframe, and 5) add these 7 new columns to our ml_input_2 dataframe to produce our final ml_input_3. Python reviews_agg_df = reviews_df[['business_id', 'stars']] \ .groupby(['business_id']) \ .agg(list) \ .reset_index() review_stars_df = pd.json_normalize( reviews_agg_df \ ['stars'] \ .apply(summarize) ) review_summary_df = pd.concat([reviews_agg_df, review_stars_df], axis=1) \ .drop(columns=['stars']) \ .set_index('business_id') business_summary_df = businesses_df[['business_id']] \ .join(review_summary_df, on='business_id') ml_input_3 = pd.concat([ml_input_2, business_summary_df], axis=1) ml_input_3.head() Plain Text Active Life American (New) American (Traditional) Arts & Entertainment \ 0 0 0 0 0 1 0 0 0 0 2 0 0 0 0 3 0 0 0 1 4 0 0 0 0 ... Skin Care Specialty Food Checkin Count Review Count Minimum Stars \ 0 ... 0 0 146 70 1.0 1 ... 0 0 33 5 1.0 2 ... 0 0 19 422 1.0 3 ... 0 0 3 8 3.0 4 ... 0 0 0 9 1.0 Maximum Stars Mean Stars Star Variance Star Skewness Star Kurtosis 0 5.0 3.342857 1.619876 -0.494108 -0.624766 1 5.0 2.800000 3.200000 0.035156 -1.581055 2 5.0 4.760664 0.747808 -3.799428 13.180355 3 5.0 4.750000 0.500000 -2.267787 3.142857 4 5.0 3.111111 4.111111 -0.158662 -1.910062 [5 rows x 18 columns] The snippet above is much more elegant than our hand-crafted solution from the previous section. Data scientists are no longer bound by their data size, and data engineers can scale up a data scientist's feature extraction code by configuring the Modin backend. Modin even offers (experimental) support for instantiating Ray clusters on AWS. The impact of data transfer between i) Couchbase, ii) Python, and iii) the Modin backend (Ray) should not be understated. The snippet above runs slower than our handcrafted approach because Modin (Ray) will naively keep the large reviews_df dataframe in its memory. If we selectively choose which review fields to pull from Couchbase... reviews_df = result_to_df("FROM yelp.reviews r SELECT r.business_id, r.stars") ...we observe a ~3x speedup in feature extraction time (3min to 1min), but we are no longer working with just the dataframe abstraction. Couchbase and SQL++ The previous two sections (more or less) cover how data scientists operate on their data using dataframes. In the first section, we illustrated how to work with Big Data in Pandas by hand-writing a chunking process ourselves. In the second section, we illustrated how we can use Modin as a drop-in replacement for Pandas to lower the barrier required for data scientists working with Big Data. In this section, we will show how data scientists can work with their Couchbase data in-situ using SQL++ instead of dataframes. As a reminder, our goal is to find some numeric and tabular representation of all businesses that we can subsequently use as input to an ML model. For the previous section, this representation was a dataframe. For this section, our goal is to build a SQL++ query that will return a Python list of numeric-valued iterables. First, let us find the top 10 categories for all businesses. The query below a) iterates over all businesses, b) splits the comma-separated string-valued field categories into an array, c) UNNESTs (a SQL++ operation equivalent to the dataframe operation explode) the array we just found, d) uses a GROUP BY clause on the split categories and the aggregate function COUNT(*) to count the number of instances per category, and e) uses the ORDER BY and the LIMIT clauses to return the top 10 keys of the groups (the category). Python results = cluster.analytics_query(""" FROM yelp.businesses b, SPLIT(b.categories, ",") c LET clean_c = TRIM(c) GROUP BY clean_c SELECT VALUE clean_c ORDER BY COUNT(*) DESC LIMIT 10 """) top_categories = list(results) top_categories Plain Text ['Restaurants', 'Food', 'Shopping', 'Home Services', 'Beauty & Spas', 'Nightlife', 'Health & Medical', 'Local Services', ... 'Skin Care'] Using the categories we just found, we will assemble a list of SELECT clause projections that return 1 if a category is found and 0 otherwise. Below, is our query result list(result) is equivalent (as a list of iterables) to the ml_input_1 dataframe from the previous two sections. Python results = cluster.analytics_query(f""" FROM yelp.businesses b SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)} """) list(results)[0] JSON { "Active Life": 0, ... "Shopping": 1, "Skin Care": 0, "Specialty Food": 0 } We will now build upon the SQL++ query above and add a subquery to compute the total number of check-ins. In the subquery below, we a) iterate over the check-ins collection, b) split and UNNEST the comma-separated-string-valued field date, and c) correlate the subquery in the WHERE clause via the conjunct c.business_id = b.business_id. In contrast to standard SQL, SQL++ queries will always return a multiset of records. Consequently, we use the SQL++ function ARRAY_COUNT around our subquery to count the number of check-ins a business has. Our query result list(result) at this stage is equivalent (as a list of iterables) to the ml_input_2 dataframe from the previous two sections. Python results = cluster.analytics_query(f""" FROM yelp.businesses b SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)}, ARRAY_COUNT(( FROM yelp.checkins c, SPLIT(c.date, ",") d WHERE c.business_id = b.business_id SELECT 1 )) AS `Checkin Count` """) list(results)[0] JSON { ... "Caterers": 0, "Checkin Count": 146, "Chicken Wings": 0, ... } The last seven features we are interested in pertaining to some statistics that were computed using scikit, a Python library. Couchbase users can leverage Python UDFs for Analytics (currently in developer preview) to execute Python code over their data using SQL++, but we will write a UDF in SQL++ that replicates the same functionality as our summarized function above. SQL CREATE ANALYTICS FUNCTION yelp.summarize(all_stars) { { "Review Count": ARRAY_COUNT(all_stars), "Minimum Stars": ARRAY_MIN(all_stars), "Maximum Stars": ARRAY_MAX(all_stars), "Mean Stars": ARRAY_AVG(all_stars), "Star Variance": ARRAY_VAR_POP(all_stars), "Star Skewness": ARRAY_SKEWNESS(all_stars), "Star Kurtosis": ARRAY_KURTOSIS(all_stars) } }; Finally, we will add to our previous SQL++ query to finish our feature-extracting query. We again start with a correlated subquery that JOINs reviews and businesses to return all-star ratings associated with a business. Per business, the results of this subquery are (conceptually) given to the yelp.summarize call. To "promote" the results of the summarized calls to the main SELECT, we leverage the v.* feature of SQL++ SELECT clause projections. Our query result list(result) is equivalent (as a list of iterables) to the ml_input_3 dataframe from the previous two sections, and is ready to be used directly as training input for some model in a machine learning library (e.g., scikit-learn): Python results = cluster.analytics_query(f""" FROM yelp.businesses b LET review_features = yelp.summarize(( FROM yelp.reviews r WHERE r.business_id = b.business_id SELECT VALUE r.stars )) SELECT {','.join(f'TO_BIGINT(CONTAINS(b.categories, "{x}")) AS `{x}`' for x in top_categories)}, ARRAY_COUNT(( FROM yelp.checkins c, SPLIT(c.date, ",") d WHERE c.business_id = b.business_id SELECT 1 )) AS `Checkin Count`, review_features.* """) list(results)[0] JSON { "Review Count": 70, "Mean Stars": 3.342857142857143, "Star Variance": 1.596734693877551, "Star Skewness": -0.49410799997654287, "Star Kurtosis": -0.624766331689814, "Minimum Stars": 1, "Maximum Stars": 5, "Restaurants": 1, "Food": 0, "Shopping": 0, ... "Checkin Count": 146 } Compared to our previous two sections, this feature collection process runs in the 40s as opposed to the 1-minute execution time from Modin and 3-minute execution time from Pandas. Couchbase and AFrame The approach above works best for users who prefer the SQL++ abstraction to the dataframe abstraction. All the computation is performed in-situ and there is no expensive loading step into Python. For users who prefer the dataframe abstraction and want to work with data in situ, AFrame is a viable alternative to using SQL++. In a nutshell, AFrame is a Python library that provides a dataframe syntax for collections in Couchbase Analytics. Behind the scenes, AFrame translates dataframe operations into SQL++ queries that are evaluated lazily, essentially deferring all dataframe operations to Couchbase Analytics itself. While not a drop-in replacement like Modin, AFrame gives data scientists a Pandas-esque API for users who want the performance of the previous section. To start, we will clone the AFrame repository and install AFrame using pip: Shell git clone https://github.com/psinthong/AFrame.git cd AFrame pip install . Once installed, we will import AFrame and define a dataframe for all three of our collections using the CBAnalyticsConnector connector. Python from aframe import AFrame from aframe.connector import CBAnalyticsConnector connector = CBAnalyticsConnector('http://localhost:8095', 'admin', 'password') businesses_df = AFrame(dataverse='yelp', dataset='businesses', connector=connector) checkins_df = AFrame(dataverse='yelp', dataset='checkins', connector=connector) reviews_df = AFrame(dataverse='yelp', dataset='reviews', connector=connector) We are now ready to define some features! First, let us find the top 10 categories using our businesses_df dataframe. In the snippet below, we 1) convert the comma-separated categories column into a column of lists, 2) "explode/unnest" the categories columns to generate a row per list entry, 3) count the distinct values for the categories column, and 4) extract the top 10 values into a set. Python top_categories = businesses_df['categories'] top_categories['categories'] = top_categories \ .map('split', ', ') top_categories = top_categories \ .explode('categories') \ .value_counts('categories') \ .nlargest(10, 'count') \ ['categories'] top_categories = set(top_categories) top_categories We are now ready to (again) encode the existence of these top 10 categories as 1/0s. The approach we will use with AFrame involves using a for loop to define a column for each category. The Analytics functions used here are contains and to_bigint. The ml_input_1 dataframe below (or more accurately, ml_input_1.drop('business_id').toPandas()) is equivalent to the ml_input_1 dataframes from the Pandas and Modin sections: Python ml_input_1 = businesses_df[['business_id', 'categories']] for category in top_categories: ml_input_1[category] = ml_input_1['categories'] \ .map('contains', category) \ .map('to_bigint') ml_input_1 = ml_input_1.drop('categories') ml_input_1.head() Behind the scenes, AFrame has assembled the following query to execute on Couchbase Analytics. Thanks to the composability of SQL++, AFrame (and other tools built on top of Couchbase) can define deep nested queries like the snippet below. SQL SELECT VALUE OBJECT_REMOVE(t, 'categories') FROM ( SELECT t.*, to_bigint(contains(categories, "Home Services")) AS `Home Services` FROM ( SELECT t.*, to_bigint(contains(categories, "Bars")) AS `Bars` FROM ... . . . ) t ) t In the snippet below, we 1) count the number of check-ins per business and 2) merge/join our results into the ml_input_1 dataframe from before to produce a new ml_input_2 dataframe. Again, the ml_input_2 dataframe below is equivalent to the ml_input_2 dataframe objects found in the first two sections. Python result = checkins_df result['date'] = result \ ['date'] \ .map('split', ', ') \ .map('array_count') ml_input_2 = ml_input_1 \ .merge(result, left_on='business_id', right_on='business_id', how='left') ml_input_2.head() To conclude, we will define our remaining seven features involving the reviews_df dataframe. In the snippet below, we use AFrame grouping and several Analytics aggregate functions that cover the statistics provided by yelp.summarize above. Python result = reviews_df[['business_id', 'stars']] \ .groupby('business_id') \ .agg({'stars': ['count', 'min', 'max', 'mean', 'var', 'skewness', 'kurtosis']}) \ .rename({'count_stars': 'Review Count', 'min_stars': 'Minimum Stars', 'max_stars': 'Maximum Stars', 'avg_stars': 'Mean Stars', 'var_stars': 'Star Variance', 'skewness_stars': 'Star Skewness', 'kurtosis_stars': 'Star Kurtosis'}) ml_input_3 = ml_input_2 \ .merge(result, left_on='business_id', right_on='business_id', how='left') \ .drop('business_id') ml_input_3.head() AFrame was a research project that (unfortunately) lacked the commercial support given to projects like Modin. Consequently, the performance AFrame offers here is a little under 2 minutes. While the generated SQL++ query is equivalent (or near-equivalent) to the handcrafted SQL++ query in the previous section, most databases (including Couchbase Analytics) have historically had trouble optimizing nested queries (see work from Elliott, Cheng, Thomas-Ogbuji, and Ozoyoglu here for research in the context of SPARQL to SQL)... ...that's not to say that AFrame should be abandoned though — the exercise in this section has shown that AFrame is a very capable "dataframe to SQL++ query" generator. The generated SQL++ queries can then be used to guide the authoring of cleaner SQL++ queries that are executed using the Couchbase Python SDK. While not as clean as Modin, this workflow (AFrame + Couchbase SDK) gives data scientists the resource efficiency of the previous section and the dataframe user model of the first two sections. Conclusion It seems we are still in need of that "silver bullet": an API that gives us efficient out-of-core execution with a dataframe user model. In this article, we looked at four different approaches for generating features using data stored in Couchbase Analytics: 1) Pandas, 2) Modin, 3) SQL++, and 4) AFrame. Pandas, the de facto dataframe standard for Python, has massive adoption by the data science community but suffers with Big Data; Modin, a (near) drop-in replacement for Pandas that allows users to scale their Pandas workflows at the cost of a (potentially) expensive loading step; SQL++, a non-dataframe user model that gives users the ability to express in-situ efficient execution of Couchbase data; and AFrame, a dataframe wrapper that translates dataframe operations into SQL++ queries.
In today's data-driven world, efficient data processing is paramount for organizations seeking insights and making informed decisions. Google Cloud Platform (GCP) offers powerful tools such as Apache Airflow and BigQuery for streamlining data processing workflows. In this guide, we'll explore how to leverage these tools to create robust and scalable data pipelines. Setting up Apache Airflow on Google Cloud Platform Apache Airflow, an open-source platform, orchestrates intricate workflows. It allows developers to define, schedule, and monitor workflows using Directed Acyclic Graphs (DAGs), providing flexibility and scalability for data processing tasks. Setting up Airflow on GCP is straightforward using managed services like Cloud Composer. Follow these steps to get started: Create a Google Cloud Composer environment: Navigate to the Cloud Composer section in the GCP Console and create a new environment. Choose the desired configuration options, such as the number of nodes and machine type. Install additional Python packages: Airflow supports custom Python packages for extending its functionality. You can install additional packages using the requirements.txt file or by directly installing them from within Airflow's web interface. Configure connections: Airflow uses connection objects to connect to external systems like BigQuery. Configure the necessary connections in Airflow's web interface by providing credentials and connection details. Designing Data Pipelines With Apache Airflow Once Airflow is set up, you can design data pipelines using Directed Acyclic Graphs (DAGs). A DAG represents a workflow composed of tasks, where each task performs a specific data processing operation. Here's how to design data pipelines with Airflow: Define DAGs: Create Python scripts to define DAGs in Airflow. Each DAG script should import the necessary modules and define tasks using operators provided by Airflow, such as BigQueryOperator for interacting with BigQuery. Python from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToGCSOperator from datetime import datetime # Define the default arguments for the DAG default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 3, 3), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1 } # Instantiate the DAG object dag = DAG( 'bigquery_data_pipeline', default_args=default_args, description='A DAG for data pipeline with BigQuery tasks', schedule_interval='@daily' ) # Define tasks start_task = DummyOperator(task_id='start_task', dag=dag) end_task = DummyOperator(task_id='end_task', dag=dag) # Define BigQuery tasks bq_query_task1 = BigQueryOperator( task_id='bq_query_task1', sql='SELECT * FROM your_table', destination_dataset_table='your_project.your_dataset.output_table1', write_disposition='WRITE_TRUNCATE', dag=dag ) bq_query_task2 = BigQueryOperator( task_id='bq_query_task2', sql='SELECT * FROM your_table WHERE date > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)', destination_dataset_table='your_project.your_dataset.output_table2', write_disposition='WRITE_APPEND', dag=dag ) # Define task dependencies start_task >> bq_query_task1 >> bq_query_task2 >> end_task In this example: We define a DAG named bigquery_data_pipeline with a daily schedule interval using the schedule_interval parameter set to '@daily'. Two dummy tasks (start_task and end_task) are defined using DummyOperator. These tasks serve as placeholders and are not associated with any actual processing. Two BigQuery tasks (bq_query_task1 and bq_query_task2) are defined using BigQueryOperator. These tasks execute SQL queries on BigQuery and store the results in destination tables. Each BigQueryOperator specifies the SQL query to be executed (SQL parameter), the destination dataset, and table (destination_dataset_table parameter), and the write disposition (write_disposition parameter). Task dependencies are defined such that bq_query_task1 must run before bq_query_task2, and both bq_query_task1 and bq_query_task2 must run between start_task and end_task. By defining DAGs in this manner, you can create robust data pipelines in Apache Airflow that interact with BigQuery for data processing and analysis. Adjust the SQL queries and destination tables as needed to suit your specific use case. Configure task dependencies: Specify task dependencies within DAGs to ensure proper execution order. Airflow allows you to define dependencies using the set_upstream and set_downstream methods. Python # Define tasks task1 = DummyOperator(task_id='task1', dag=dag) task2 = DummyOperator(task_id='task2', dag=dag) task3 = DummyOperator(task_id='task3', dag=dag) task4 = DummyOperator(task_id='task4', dag=dag) # Set task dependencies task1.set_downstream(task2) task1.set_downstream(task3) task2.set_downstream(task4) task3.set_downstream(task4) In this example: We create a DAG named sample_dag with a daily schedule interval. Four tasks (task1, task2, task3, task4) are defined using DummyOperator, which represents placeholder tasks. Task dependencies are configured using the set_downstream method. In this case, task2 and task3 are downstream of task1, and task4 is downstream of both task2 and task3. This setup ensures that task1 will be executed first, followed by either task2 or task3 (as they are parallelized), and finally task4 will be executed after both task2 and task3 are completed. Set task schedules: Configure task schedules within DAGs to control when they should be executed. Airflow supports various scheduling options, including cron expressions and interval schedules. Python # Set task schedules task1_execution_time = datetime(2024, 3, 3, 10, 0, 0) # Task 1 scheduled to run at 10:00 AM task2_execution_time = task1_execution_time + timedelta(hours=1) # Task 2 scheduled to run 1 hour after Task 1 task3_execution_time = task1_execution_time + timedelta(hours=2) # Task 3 scheduled to run 2 hours after Task 1 task1.execution_date = task1_execution_time task2.execution_date = task2_execution_time task3.execution_date = task3_execution_time # Define task dependencies task1.set_downstream(task2) task2.set_downstream(task3) In this example: We create a DAG named sample_scheduled_dag with a daily schedule interval using the schedule_interval parameter set to '@daily' in configuring task Dependencies. Task schedules are configured by specifying the execution_date for each task. task1 is scheduled to run at 10:00 AM, task2 is scheduled to run 1 hour after task1, and task3 is scheduled to run 2 hours after task1. Task dependencies are set up such that task2 is downstream of task1, and task3 is downstream of task2. By configuring task schedules within the DAG, you can control when each task should be executed, allowing for precise orchestration of data processing workflows in Apache Airflow. Integrating With BigQuery for Data Processing BigQuery, offered by Google Cloud, is a fully managed and serverless data warehouse solution. It offers high-performance SQL queries and scalable storage for analyzing large datasets. Here's how to integrate BigQuery with Apache Airflow for data processing: Execute SQL queries: Using the BigQueryOperator, you can execute SQL queries on BigQuery as part of your Apache Airflow DAGs, enabling seamless integration of data processing workflows with Google BigQuery. Adjust the SQL queries and destination tables as needed to match your specific requirements. Load and export data: Airflow allows you to load data into BigQuery from external sources or export data from BigQuery to other destinations. Use operators like BigQueryToBigQueryOperator and BigQueryToGCSOperator for data loading and exporting operations. Python # Define BigQuery tasks for loading data from external source bq_load_external_data_task = BigQueryToBigQueryOperator( task_id='bq_load_external_data', source_project_dataset_table='external_project.external_dataset.external_table', destination_project_dataset_table='your_project.your_dataset.internal_table', write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', dag=dag ) # Define BigQuery tasks for exporting data to Google Cloud Storage (GCS) bq_export_to_gcs_task = BigQueryToGCSOperator( task_id='bq_export_to_gcs', source_project_dataset_table='your_project.your_dataset.internal_table', destination_cloud_storage_uris=['gs://your_bucket/your_file.csv'], export_format='CSV', dag=dag ) # Define task dependencies start_task >> bq_load_external_data_task >> bq_export_to_gcs_task >> end_task Monitor and manage jobs: Airflow provides built-in monitoring and logging capabilities for managing BigQuery jobs. Monitor job statuses, view logs, and handle job failures using Airflow's web interface or command-line tools. Here's how you can effectively monitor and manage BigQuery jobs in Airflow: 1. Airflow Web Interface DAG Runs Page: The Airflow web interface provides a "DAG Runs" page where you can view the status of each DAG run. This includes information on whether the DAG run succeeded, failed, or is currently running. Task Instance Logs: You can access logs for each task instance within a DAG run. These logs provide detailed information about task execution, including any errors or exceptions encountered. Graph View: The graph view in the Airflow UI provides a visual representation of the DAG and its task dependencies. You can use this view to understand the workflow and identify any bottlenecks or issues. 2. Command-Line Interface (CLI) airflow dags list: Use the airflow dags list command to list all available DAGs in your Airflow environment. This command provides basic information about each DAG, including its status and last execution date. airflow dags show: The airflow dags show command allows you to view detailed information about a specific DAG, including its tasks, task dependencies, and schedule intervals. airflow tasks list: Use the airflow tasks list command to list all tasks within a specific DAG. This command provides information about each task, such as its current state and execution date. airflow task logs: You can access task logs using the airflow task logs command. This command allows you to view logs for a specific task instance, helping you troubleshoot errors or failures. 3. Logging and Alerts Airflow logging: Airflow logs all task executions and DAG runs, making it easy to track job progress and identify issues. You can configure logging levels and handlers to control the verbosity and destination of logs. Alerting: Configure alerts and notifications to be triggered based on specific events, such as task failures or DAG run statuses. You can use tools like Slack, email, or PagerDuty to receive alerts and take appropriate actions. 4. Monitoring Tools Stackdriver monitoring: If you're running Airflow on Google Cloud Platform, you can use Stackdriver Monitoring to monitor the health and performance of your Airflow environment. This includes metrics such as CPU usage, memory usage, and task execution times. Prometheus and Grafana: Integrate Airflow with Prometheus and Grafana for advanced monitoring and visualization of performance metrics. This allows you to create custom dashboards and gain insights into the behavior of your Airflow jobs. By leveraging these monitoring and management capabilities provided by Apache Airflow, you can effectively monitor job statuses, view logs, and handle job failures, ensuring the reliability and efficiency of your data workflows, including those involving BigQuery. Best Practices for Streamlining Data Processing To ensure efficient data processing workflows on Google Cloud Platform, consider the following best practices: 1. Optimize Query Performance Use efficient SQL queries: Craft SQL queries that leverage BigQuery's capabilities efficiently. Optimize joins, aggregations, and filtering conditions to minimize data scanned and improve query performance. Leverage partitioning and clustering: Partition tables based on frequently filtered columns to reduce query costs and improve query performance. Utilize clustering to organize data within partitions for further optimization. Utilize query caching: Take advantage of BigQuery's caching mechanism to avoid redundant computation. Reuse cached results for identical queries to reduce query execution time and costs. 2. Scale Resources Dynamically Auto-scaling: Configure Airflow and associated resources to scale automatically based on workload demands. Use managed services like Cloud Composer on GCP, which can automatically scale Airflow clusters based on the number of active DAGs and tasks. Preemptible VMs: Utilize preemptible VMs (preemptible instances) for batch processing tasks that can tolerate interruptions. Preemptible VMs are cost-effective and can significantly reduce resource costs for non-critical workloads. 3. Implement Error Handling Task retries: Configure Airflow tasks to retry automatically upon failure. Use exponential backoff strategies to gradually increase retry intervals and avoid overwhelming downstream services. Error handling mechanisms: Implement robust error handling mechanisms within data pipelines to handle transient errors, network issues, and service interruptions gracefully. Utilize Airflow's built-in error handling features like on_failure_callback to execute custom error handling logic. Monitoring alerts: Set up monitoring alerts and notifications to proactively detect and respond to pipeline failures. Use GCP's monitoring and alerting services like Cloud Monitoring and Stackdriver Logging to monitor Airflow task execution and trigger alerts based on predefined conditions. 4. Monitor and Tune Performance Performance metrics monitoring: Monitor pipeline performance metrics, including query execution time, data processing throughput, and resource utilization. Use GCP's monitoring tools to track performance metrics in real-time and identify performance bottlenecks. Fine-tune configurations: Regularly review and fine-tune pipeline configurations based on performance monitoring data. Optimize resource allocation, adjust parallelism settings, and tweak query parameters to improve overall performance. Capacity planning: Perform capacity planning exercises to ensure that resources are provisioned optimally to meet workload demands. Scale resources up or down as needed based on historical usage patterns and projected growth. Conclusion By leveraging Apache Airflow and BigQuery on Google Cloud Platform, developers can streamline data processing workflows and build scalable data pipelines for analytics and decision-making. Follow the guidelines outlined in this developer guide to design efficient data pipelines, integrate with BigQuery, and implement best practices for optimizing performance and reliability. With the right tools and practices in place, organizations can unlock the full potential of their data assets and drive business success in the cloud.
The modern data stack represents the evolution of data management, shifting from traditional, monolithic systems to agile, cloud-based architectures. It's designed to handle large amounts of data, providing scalability, flexibility, and real-time processing capabilities. This stack is modular, allowing organizations to use specialized tools for each function: data ingestion, storage, transformation, and analysis, facilitating a more efficient and democratized approach to data analytics and business operations. As businesses continue to prioritize data-driven decision-making, the modern data stack has become integral to unlocking actionable insights and fostering innovation. The Evolution of Modern Data Stack The Early Days: Pre-2000s Companies use big, single systems to keep and manage their data. These were good for everyday business tasks but not so much for analyzing lots of data. Data was stored in traditional relational databases like Oracle, IBM DB2, and Microsoft SQL Server. The Big Data Era: Early 2000s - 2010s This period marked the beginning of a shift towards systems that could handle massive amounts of data at high speeds and in various formats. We started to see a lot more data from all over, and it was coming in fast. New tech like Hadoop helped by spreading out the data work across many computers. The Rise of Cloud Data Warehouses: Mid-2010s Cloud computing started to revolutionize data storage and processing. Cloud data warehouses like Amazon Redshift and Google BigQuery offered scalability and flexibility, changing the economics and speed of data analytics. Also, Snowflake, a cloud-based data warehousing startup, emerged, offering a unique architecture separating computing and storage. The Modern Data Stack: Late 2010s - Present The modern data stack took shape with the rise of ELT processes, SaaS-based data integration tools, and the separation of storage and compute. This era saw the proliferation of tools designed for specific parts of the data lifecycle, enabling a more modular and efficient approach to data management. Limitations of Traditional Data Systems In my data engineering career, across several organizations, I've extensively worked with Microsoft SQL Server. This section will draw from those experiences, providing a personal touch as I recount the challenges faced with this traditional system. Later, we'll explore how the Modern Data Stack (MDS) addresses many of these issues; some solutions were quite a revelation to me! Scalability Traditional SQL Server deployments were often hosted on-premises, which meant that scaling up to accommodate growing data volumes required significant hardware investments and could lead to extended downtime during upgrades. What's more, when we had less data to deal with, we still had all these extra hardware that we didn't really need. But we were still paying for them. It was like paying for a whole bus when you only need a few seats. Complex ETL SSIS was broadly used for ETL; while it is a powerful tool, it had certain limitations, especially when compared to more modern data integration solutions. Notably, Microsoft SQL Server solved a lot of these limitations in Azure Data Factory and SQL Server Data Tools (SSDT). API calls: SSIS initially lacked direct support for API calls. Custom scripting was required to interact with web services, complicating ETL processes. Memory allocation: SSIS jobs needed careful memory management. Without enough server memory, complex data jobs could fail. Auditing: Extensive auditing within SSIS packages was necessary to monitor and troubleshoot, adding to the workload. Version control: Early versions of SSIS presented challenges with version control integration, complicating change tracking and team collaboration. Cross-platform accessibility: Managing SSIS from non-Windows systems was difficult, as it was a Windows-centric tool. Maintenance Demands The maintenance of on-premises servers was resource-intensive. I recall the significant effort required to ensure systems were up-to-date and running smoothly, often involving downtime that had to be carefully managed. Integration Integrating SQL Server with newer tools and platforms was not always straightforward. It sometimes required creative workarounds, which added to the complexity of our data architecture. How the Modern Data Stack Solved My Data Challenges The Modern Data Stack (MDS) fixed a lot of the old problems I had with SQL Server. Now, we can use the cloud to store data, which means no more spending on big, expensive servers we might not always need. Getting data from different places is easier because there are tools that do it all for us, and there is no more tricky coding. When it comes to sorting and cleaning up our data, we can do it straight into the database with simple commands. This avoids the headaches of managing big servers or digging through tons of data to find a tiny mistake. And when we talk about keeping our data safe and organized, the MDS has tools that make this super easy and way less of a chore. So with the MDS, we're saving time, we can move quicker, and it's a lot less hassle all around. It's like having a bunch of smart helpers who take care of the tough stuff so we can focus on the cool part—finding out what the data tells us. Components of the Modern Data Stack MDS is made up of various layers, each with specialized tools that work together to streamline data processes. Data Ingestion and Integration The extraction and loading of data from diverse sources, including APIs, databases, and SaaS applications. Ingestion tools fivetran, stitch, airbyte, segment, etc. Data Storage Modern cloud data warehouses and data lakes offer scalable, flexible, and cost-effective storage solutions. Cloud Data Warehouses Google Bigquery, Snowflake, Redshift, etc. Data Transformation Tools like dbt (data build tool) enable transformation within the data warehouse using simple SQL, improving upon traditional ETL processes. Data Analysis and Business Intelligence The analytics and Business Intelligence tools allow for advanced data exploration, visualization, and sharing of insights across the organization. Business Intelligence Tools Tableau, Looker, Power BI, Good Data Data Extraction and Reverse ETL Enables organizations to operationalize their warehouse data by moving it back into business applications, driving action from insights. Reverse ETL tools Hightouch, Census Data Orchestration Platforms that help automate and manage data workflows, ensuring that the right data is processed at the right time. Orchestration Tools Airflow, Astronomer, Dagster, AWS Step Functions Data Governance and Security Data governance focuses on the importance of managing data access, ensuring compliance, and protecting data within the MDS. Data Governance also provides comprehensive management of data access, quality, and compliance while offering an organized inventory of data assets that enhances discoverability and trustworthiness. Data Catalog Tools Alation (for data cataloging), Collibra (for governance and cataloging), Apache Atlas. Data Quality Ensures data reliability and accuracy through validation and cleaning, providing confidence in data-driven decision-making. Data Quality Tools: Talend, Monte Carlo, Soda, Anomolo, Great Expectations Data Modeling Assists in designing and iterating database schemas easily, supporting agile and responsive data architecture practices. Modeling Tools Erwin, SQLDBM Conclusion: Embracing MDS With Cost Awareness The Modern Data Stack is pretty amazing; it's like having a Swiss army knife for handling data. It definitely makes things faster and less of a headache. But while it's super powerful and gives us a lot of cool tools, it's also important to keep an eye on the price tag. The pay-as-you-go pricing of the cloud is great because we only pay for what we use. But, just like a phone bill, if we're not careful, those little things can add up. So, while we enjoy the awesome features of the MDS, we should also make sure to stay smart about how we use them. That way, we can keep saving time without any surprises when it comes to costs.
Miguel Garcia
Sr Engineering Director,
Factorial
Gautam Goswami
Founder,
DataView