Wednesday, October 6, 2021

Apache Kafka, Setup it

Kafka short introduction

Kafka is an event based messaging system that safely moves data between systems.

Related Post
Before you start I would like to suggest you to read my previous post "What is Apache Kafka".

Let's talk about setup. 
You need java running in your system, let's see my current java version. Just open terminal and execute "java -version".

  $ java -version
  java version "1.8.0_301"
  Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
  Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)
Once you get java running in your system, follow steps to setup and get kafka running:

1. Download & Start service
a. Get latest kafka release, using below link or from official kafka build page. 
b. Unzip archive and navigate to the extracted directory in console.
c. Run the following commands in order to start all services in the correct order. (new terminal for each command)

    Start the ZooKeeper service
    $ bin/ config/

    Start the Kafka broker service
    $ bin/ config/

2. Create a topic to store your events.
Kafka is a distributed event streaming platform that lets you read, write, store, and process events (also called records or messages in the documentation) across many machines.
Example events are payment transactions, geolocation updates from mobile phones, shipping orders, sensor measurements from IoT devices or medical equipment, and much more. These events are organized and stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.
So before you can write your first events, you must create a topic (named "quickstart-events"). Open another terminal session and run:
$ bin/ --create --topic quickstart-events --bootstrap-server localhost:9092

3. Write some event into the topic
A Kafka client communicates with the Kafka brokers via the network for writing (or reading) events. Once received, the brokers will store the events in a durable and fault-tolerant manner for as long as you need—even forever.
Run the console producer client to write a few events into your topic. By default, each line you enter will result in a separate event being written to the topic.

$ bin/ --topic quickstart-events --bootstrap-server localhost:9092
  This is my first event
  This is my second event
4. Read the events
Open another terminal session and run the console consumer client to read the events you just created:

$ bin/ --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

5. Higher Level Diagram

So this was about setup of "Apache Kafka", in next post we will use Kafka in NodeJS application.

What is Apache Kafka?

Apache Kafka is an event streaming platform.

Let's talk in detailed about it.
Kafka is an event based messaging system that safely moves data between systems. Depending on how each component is configured, it can act as a transport for real-time event tracking or as a replicated distributed database.
Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:
  • To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
  • To store streams of events durably and reliably for as long as you want.
  • To process streams of events as they occur or retrospectively.
And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

How Apache Kafka works?
Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol.

Servers: Kafka is run as a cluster of one or more servers that can span multiple datacenters or cloud regions. Some of these servers form the storage layer, called the brokers. Other servers run Kafka Connect to continuously import and export data as event streams to integrate Kafka with your existing systems such as relational databases as well as other Kafka clusters.

Clients: They allow you to write distributed applications and microservices that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner even in the case of network problems or machine failures. Kafka ships with some such clients included, which are augmented by dozens of clients provided by the Kafka community: clients are available for Java and Scala including the higher-level Kafka Streams library, for Go, Python, C/C++, and many other programming languages as well as REST APIs.

Kafka APIs
In addition to command line tooling for management and administration tasks, Kafka has five core APIs for Java and Scala:
  1. The Admin API to manage and inspect topics, brokers, and other Kafka objects.
  2. The Producer API to publish (write) a stream of events to one or more Kafka topics.
  3. The Consumer API to subscribe to (read) one or more topics and to process the stream of events produced to them.
  4. The Kafka Streams API to implement stream processing applications and microservices.
  5. The Kafka Connect API to build and run reusable data import/export connectors that consume (read) or produce (write) streams of events from and to external systems and applications so they can integrate with Kafka.

Let's see some use cases
  • Messaging: Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.
  • Website Activity Tracking: The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type.
  • Metrics: Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralised feeds of operational data.
  • Log Aggregation: Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files.
  • Stream Processing: Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing.
  • Event Sourcing: Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.

Let's see some common terms




The collective group of machines that Kafka is running on


A single Kafka instance


Topics are used to organize data. You always read and write to and from a particular topic


Data in a topic is spread across a number of partitions. Each partition can be thought of as a log file, ordered by time. To guarantee that you read messages in the correct order, only one instance can read from a particular partition at a time.


A client that writes data to one or more Kafka topics


A client that reads data from one or more Kafka topics


Partitions are typically replicated to one or more brokers to avoid data loss.


Although a partition may be replicated to one or more brokers, a single broker is elected the leader for that partition, and is the only one who is allowed to write or read to/from that partition

Consumer group

A collective group of consumer instances, identified by a groupId. In a horizontally scaled application, each instance would be a consumer and together they would act as a consumer group.

Group Coordinator

An instance in the consumer group that is responsible for assigning partitions to consume from to the consumers in the group


A certain point in the partition log. When a consumer has consumed a message, it "commits" that offset, meaning that it tells the broker that the consumer group has consumed that message. If the consumer group is restarted, it will restart from the highest committed offset.

So this was introduction post about "Apache Kafka", in next post I will explain it's basic setup.

Monday, August 14, 2017

Hello World - Kinvey Flex SDK

Hello Friends,

Recently I have started work in Node.js for one of our project's backend development, We are using kinvey for our backend and I'm responsible for all the backend architecture & development. So in this post I'm going to show you usage of kinvey's flex sdk to create simple node service and run it.

1 - Create package
$ mkdir helloworld
$ cd helloworld
$ npm init

2 - Install kinvey flex sdk in you node package.
$ npm install kinvey-flex-sdk --save

3 - To use kinvey flex module in your project add require as following.
const sdk = require('kinvey-flex-sdk');
4 - Check IP address of your work station or server.
My macbook has

5 - Write flex function code

// File : index.js
const sdk = require('kinvey-flex-sdk');
sdk.service({ host: '', port: 8080 }, function (err, flex) {

    const flexFunctions = flex.functions;   // gets the FlexFunctions object from the service

    function helloWorldHandler(request, complete, modules) {
            return complete({ "message": "Hello World" }).ok().done();
    flexFunctions.register('helloWorld', helloWorldHandler);

6 - Run service
$ node index.js
$ Service listening on 8080

7 - Send request to our flex function, execute following command to see response from our node service that is written using kinvey's flex sdk.

$ curl -X POST -H "Content-Type: application/json"  -d '{}' ""

Above command will return following json response.
{"request":{"method":"POST","headers":{},"username":"","userId":"","objectName":"","tempObjectStore":{},"body":{}},"response":{"status":0,"headers":{},"body":{"message":"Hello World"},"statusCode":200,"continue":false}}

I hope this post will help you to understand basic of kinvey flex sdk, later I will try to write some posts on advance concepts of flex sdk.


Tuesday, July 8, 2014

How to Post tweet from Symfony2 application

This post will guide you to build simple twitter service for Symfony2 application using Codebird-php library,  So I decided to write one simple post to use this cool library in Symfony2.

A – Create twitter application

Creating a Twitter App is easy (see below steps)

A-1 : Go to and login with your account.

A-2 : Click 'Create a new application' button.

A-3 : Complete the required fields and press 'Create your Twitter application'.

A-4 : Open 'Settings' TAB and set Application Type to 'Read and Write'. Then press 'Update this Twitter application's settings'

A-5 : Go to Details TAB and press 'Create my access token' button

A-6 : Go to oAuth tool TAB and get your access tokens

After creating the Twitter App, you will get following credentials to communicate using Twitter API.

1. ConsumerKey
2. ConsumerSecret
3. AccessToken
4. AccessTokenSecret

B – Install twitter library “Codebird-php” in Symfony2 project
Execute “php composer.phar require jublonet/codebird-php:dev-master” in root of your Symfony2 project directory, It will install codebird-php library in your project.

C – Add twitter applicatoin tokens and keys in parameter.yml
Add following parameters under “parameters:”
    hm_twitterApiKey: "your-api-key"
    hm_twitterApiSecret: "your-api-secret"
    hm_twitterApiToken: "your-api-token"
    hm_twitterApiTokenSecret: "your-api-token-secret"

D – Create and register twitter service
Now, let's create service. It will allow you to send tweet to twitter from any controller or any other place of your choice.

D-1 : Create service class, with suitable namespace

    namespace Rm\DemoBundle\Services;

    use Codebird\Codebird;

     * Service to handle twitter post
     * @author Rajesh Meniya
    class TwitterService
         * Service container
         * @var container
         protected $container;

         * construct
        public function __construct($container)
            $this->container = $container;

        public function postTweet($status)
            $cb = Codebird::getInstance();
            $reply = $cb->statuses_update('status=' . $status);
            return $reply;

D-2 : Register your service in your services.yml file
        class: Rm\DemoBundle\Services\TwitterService
            container: “@service_container”

EPost tweet using your service.

Now you can use your twitter service in your application to send tweet, for example your can post tweet from your controller action like :

    $tweetText = "My new status from symfony application, just for test";
    $result = $this->get('hmgmt.twitter')->postTweet($tweetText);

Ref link:

Saturday, August 10, 2013

Symfony 2.2 project admin using Sonata admin bundles

Hello Friends,

This is my second post to build Symfony admin panel using sonata admin bundles. In this post we will create basic administrator panel for Symfony 2.2 project with authentication, user and group management. So let's start steps for it.

1 - Download & setup
Download Symfony 2.2 package with vendor and configure project with proper write permission for cache, logs directory & vhost to access package in browser.

2 - Check version of your Symfony package
> php app/console -V
> Symfony version 2.2.3 - app/dev/debug

3 - Install required bundles
Install sonata-project/block-bundle
> php composer.phar require sonata-project/block-bundle:2.2.3

Install sonata-project/jquery-bundle
> php composer.phar require sonata-project/jquery-bundle:1.8.0

Install knplabs/knp-menu
> php composer.phar require knplabs/knp-menu:1.1.2

Install knplabs/knp-menu-bundle
> php composer.phar require knplabs/knp-menu-bundle:1.1.2

Install sonata-project/exporter
> php composer.phar require sonata-project/exporter:1.3.0

Install sonata-project/admin-bundle
> php composer.phar require sonata-project/admin-bundle:2.2.2

Install sonata-project/doctrine-orm-admin-bundle
> php composer.phar require sonata-project/doctrine-orm-admin-bundle:2.2.*@dev

Install sonata-project/intl-bundle
> php composer.phar require sonata-project/intl-bundle:2.1.*

4 - Alter your config.yml and add following configurations.
# app/config/config.yml
# Sonata block Configuration
    default_contexts: [cms]
            contexts:   [admin]
        #    contexts:   [admin]

# knp menu configuration (optional)
        template: knp_menu.html.twig
    templating: false
    default_renderer: twig

# Translator configuration
    translator: ~

# Sonata intl configuration
        # default timezone used as fallback
        default: Europe/Paris

        # locale specific overrides

            fr: Europe/Paris
            en_UK: Europe/London

5 - Be sure to enable these bundles in your AppKernel.php file.
// app/AppKernel.php
public function registerBundles()
    return array(
        // ...
        new Sonata\BlockBundle\SonataBlockBundle(),
        new Knp\Bundle\MenuBundle\KnpMenuBundle(),
        new Sonata\jQueryBundle\SonatajQueryBundle(),
        new Sonata\AdminBundle\SonataAdminBundle(),
        new Sonata\DoctrineORMAdminBundle\SonataDoctrineORMAdminBundle(),
        new Sonata\IntlBundle\SonataIntlBundle(),
        // ...

6 - Install the assets from the bundles
> php app/console assets:install web

7 - Now lets add routes for sonata admin bundle.
# app/config/routing.yml
    resource: '@SonataAdminBundle/Resources/config/routing/sonata_admin.xml'
    prefix: /admin

    resource: .
    type: sonata_admin
    prefix: /admin

8 - Delete cache files
> php app/console cache:clear

9 - Install FOS & sonata-project/user-bundle
9-A :Install friendsofsymfony/user-bundle
> php composer.phar require friendsofsymfony/user-bundle:1.3.2

9-B :Install sonata-project/user-bundle
> php composer.phar require sonata-project/user-bundle:2.1.1

10 - Be sure to enable these bundles in your AppKernel.php file.
// app/AppKernel.php
public function registerBundles()
    return array(
        // ...
        new Sonata\BlockBundle\SonataBlockBundle(),
        new Knp\Bundle\MenuBundle\KnpMenuBundle(),
        new Sonata\jQueryBundle\SonatajQueryBundle(),
        new Sonata\AdminBundle\SonataAdminBundle(),
        new Sonata\DoctrineORMAdminBundle\SonataDoctrineORMAdminBundle(),
        new Sonata\IntlBundle\SonataIntlBundle(),

    new FOS\UserBundle\FOSUserBundle(),
    new Sonata\UserBundle\SonataUserBundle('FOSUserBundle'),
    new Sonata\EasyExtendsBundle\SonataEasyExtendsBundle(),
        // ...

11 - Alter config.yml and add configurations for FOS user bundle.
# app/config/config.yml

# FOS user bundle configuration
    db_driver: orm
    firewall_name: main
    user_class: Application\Sonata\UserBundle\Entity\User

12 - Generate & configure User bundle
12-A : Generate user bundle
This bundle will gives an interface to manage users in application. Execute below command to generate User bundle for you by extending SontaUserBundle.
> php app/console sonata:easy-extends:generate SonataUserBundle

12-B : Register generated user bundle in autoload.php
Add below line in autoload.php
$loader->add('Application', __DIR__);

Your autoload.php file looks like below
    use Doctrine\Common\Annotations\AnnotationRegistry;
    use Composer\Autoload\ClassLoader;

     * @var $loader ClassLoader
    $loader = require __DIR__.'/../vendor/autoload.php';

    // intl
    if (!function_exists('intl_get_error_code')) {
        require_once __DIR__.'/../vendor/symfony/symfony/src/Symfony/Component/Locale/Resources/stubs/functions.php';

    $loader->add('Application', __DIR__);

    AnnotationRegistry::registerLoader(array($loader, 'loadClass'));

    return $loader;

12-C : Register User bundle in app/AppKernel.php
Add below line to register generated user bundle.
   new Application\Sonata\UserBundle\ApplicationSonataUserBundle()
    // app/AppKernel.php
    public function registerBundles()
        return array(
        // ...
        new Sonata\BlockBundle\SonataBlockBundle(),
        new Knp\Bundle\MenuBundle\KnpMenuBundle(),
        new Sonata\jQueryBundle\SonatajQueryBundle(),
        new Sonata\AdminBundle\SonataAdminBundle(),
        new Sonata\DoctrineORMAdminBundle\SonataDoctrineORMAdminBundle(),
        new Sonata\IntlBundle\SonataIntlBundle(),

        new FOS\UserBundle\FOSUserBundle(),
        new Sonata\UserBundle\SonataUserBundle('FOSUserBundle'),
        new Sonata\EasyExtendsBundle\SonataEasyExtendsBundle(),
        new Application\Sonata\UserBundle\ApplicationSonataUserBundle(),
        // ...

12- D : Add fos and sonata user routes in app/routing.yml
       resource: "@FOSUserBundle/Resources/config/routing/security.xml"
       resource: "@FOSUserBundle/Resources/config/routing/profile.xml"
       prefix: /profile
       resource: "@FOSUserBundle/Resources/config/routing/registration.xml"
       prefix: /register
       resource: "@FOSUserBundle/Resources/config/routing/resetting.xml"
       prefix: /resetting
       resource: "@FOSUserBundle/Resources/config/routing/change_password.xml"
       prefix: /change-password
       resource: "@FOSUserBundle/Resources/config/routing/security.xml"
       resource: "@FOSUserBundle/Resources/config/routing/profile.xml"
       prefix: /profile
       resource: "@FOSUserBundle/Resources/config/routing/registration.xml"
       prefix: /register
       resource: "@FOSUserBundle/Resources/config/routing/resetting.xml"
       prefix: /resetting
       resource: "@FOSUserBundle/Resources/config/routing/change_password.xml"
       prefix: /change-password

       resource: '@SonataUserBundle/Resources/config/routing/admin_security.xml'
       prefix: /admin
       pattern: /
       defaults: { _controller: SonataPageBundle:Page:catchAll }

12-E : Clear cache
> php app/console cache:clear

13 - Update db schema
> php app/console doctrine:schema:update --force
NOTE : if symfony give 'Unknown column type "json" requested.......'  error, add custom type in Doctrine configuration, see below code

# app/config/config.yml
    #for SonataNotificationBundle
        json: Sonata\Doctrine\Types\JsonType

14 - Add sonata admin configuration
    title:      <your application title>
    title_logo: /path/to/logo.png

        #          # display a dashboard block
        - { position: left, type: sonata.admin.block.admin_list }

15 - Lets create super admin user
All configurations completed, So lets create super admin user for our newly created Symfony 2.2 admin panel.
> php app/console fos:user:create <username> <email> <password> --super-admin

Now you can access your admin panel using below url
URL : http://yourdomain/app_dev.php/admin/dashboard

16 - Bundles url

Now, our fully functional Symfony2.2 admin panel is ready.

Please post your comments, share my post and join this blog.

Rajesh Meniya
Symfony Developer