Wednesday, April 8, 2026
Mobile Offer

🎁 You've Got 1 Reward Left

Check if your device is eligible for instant bonuses.

Unlock Now
Survey Cash

🧠 Discover the Simple Money Trick

This quick task could pay you today — no joke.

See It Now
Top Deals

📦 Top Freebies Available Near You

Get hot mobile rewards now. Limited time offers.

Get Started
Game Offer

🎮 Unlock Premium Game Packs

Boost your favorite game with hidden bonuses.

Claim Now
Money Offers

💸 Earn Instantly With This Task

No fees, no waiting — your earnings could be 1 click away.

Start Earning
Crypto Airdrop

🚀 Claim Free Crypto in Seconds

Register & grab real tokens now. Zero investment needed.

Get Tokens
Food Offers

🍔 Get Free Food Coupons

Claim your free fast food deals instantly.

Grab Coupons
VIP Offers

🎉 Join Our VIP Club

Access secret deals and daily giveaways.

Join Now
Mystery Offer

🎁 Mystery Gift Waiting for You

Click to reveal your surprise prize now!

Reveal Gift
App Bonus

📱 Download & Get Bonus

New apps giving out free rewards daily.

Download Now
Exclusive Deals

💎 Exclusive Offers Just for You

Unlock hidden discounts and perks.

Unlock Deals
Movie Offer

🎬 Watch Paid Movies Free

Stream your favorite flicks with no cost.

Watch Now
Prize Offer

🏆 Enter to Win Big Prizes

Join contests and win amazing rewards.

Enter Now
Life Hack

💡 Simple Life Hack to Save Cash

Try this now and watch your savings grow.

Learn More
Top Apps

📲 Top Apps Giving Gifts

Download & get rewards instantly.

Get Gifts
Summer Drinks

🍹 Summer Cocktails Recipes

Make refreshing drinks at home easily.

Get Recipes

Latest Posts

A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner


In this tutorial, we demonstrate how to build a unified Apache Beam pipeline that works seamlessly in both batch and stream-like modes using the DirectRunner. We generate synthetic, event-time–aware data and apply fixed windowing with triggers and allowed lateness to demonstrate how Apache Beam consistently handles both on-time and late events. By switching only the input source, we keep the core aggregation logic identical, which helps us clearly understand how Beam’s event-time model, windows, and panes behave without relying on external streaming infrastructure. Check out the FULL CODES here.

!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We install the required dependencies and ensure version compatibility so that Apache Beam. We import the core Beam APIs along with windowing, triggers, and TestStream utilities needed later in the pipeline. We also bring in standard Python modules for time handling and JSON formatting. Check out the FULL CODES here.

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, amount, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

We define the global configuration that controls window size, lateness, and execution mode. We create synthetic events with explicit event-time timestamps so that windowing behavior is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order and late events to observe Beam’s event-time semantics. Check out the FULL CODES here.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "count": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def expand(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           trigger=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Count.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"count": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We build a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user and compute counts and sums. We keep this transform independent of the data source, so the same logic applies to both batch and streaming inputs. Check out the FULL CODES here.

class AddWindowInfo(beam.DoFn):
   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.start)
       we = float(window.end)
       yield {
           **element,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ])
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ])
       .advance_processing_time(5)
       .add_elements([
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ])
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich each aggregated record with window and pane metadata so we can clearly see when and why results are emitted. We convert Beam’s internal timestamps into human-readable UTC times for clarity. We also define a TestStream that simulates real streaming behavior using watermarks, processing-time advances, and late data. Check out the FULL CODES here.

def run_batch():
   with beam.Pipeline(options=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(options=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We wire everything together into executable batch and stream-like pipelines. We toggle between modes by changing a single flag while reusing the same aggregation transform. We run the pipeline and print the windowed results directly, making the execution flow and outputs easy to inspect.

In conclusion, we demonstrated that the same Beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving identical windowing and aggregation semantics. We observed how watermarks, triggers, and accumulation modes influence when results are emitted and how late data updates previously computed windows. Also, we focused on the conceptual foundations of Beam’s unified model, providing a solid base for later scaling the same design to real streaming runners and production environments.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Check out our latest release of ai2025.dev, a 2025-focused analytics platform that turns model launches, benchmarks, and ecosystem activity into a structured dataset you can filter, compare, and export


Michal Sutter is a data science professional with a Master of Science in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michal excels at transforming complex datasets into actionable insights.



Source link

Latest Posts

Don't Miss

Stay in touch

To be updated with all the latest news, offers and special announcements.