github: github.com/mensfeld
www: mensfeld.pl
twitter: @maciejmensfeld
e-mail: maciej@mensfeld.pl
Talk to me about:
Karafka, Kafka, Ruby, Poland, System Architecture, High throughput data processing,
dry-rb, Trailblazer, TCP, Ruby on Rails or anything else TBH :)
Event-driven architecture (EDA), is a software architecture pattern promoting the production, detection, consumption of, and reaction to events.Image source: Wiki
In object-oriented and functional programming, an immutable object (unchangeable object) is an object whose state cannot be modified after it is created.Image source: Wiki
Domain-driven design is an approach to developing software for complex needs by deeply connecting the implementation to an evolving model of the core business concepts.Image source: DDD Community
Instead of focussing on current state, you focus on the changes that have occurred over time. It is the practice of modelling your system as a sequence of events.Image source: Dev.to
The Saga pattern describes how to solve business transactions (...) in distributed systems. The basic idea is to break the overall transaction into multiple steps. (...) the overall consistency is taken care of by the Saga.
Event type | Params |
---|---|
Item added | { cart_id: 1, item: 'Book about Erlang' } |
Item added | { cart_id: 1, item: 'Book about Ruby' } |
Item removed | { cart_id: 1, item: 'Book about Ruby' } |
Item | Quantity |
---|---|
Book about Erlang | 1 |
Book about Ruby | 2 |
Image source: Joel Semeniuk article
You may not yet be aware of future use-cases of the data you now possess.
Image source: Twitter
Consumer group is a set of consumers sharing a common group identifier.
Img source: Apache Kafka for service architecture
Img source: Ben Stopford's talk
If an instance of a service dies, data is redirected and ordering guarantees are maintained.
Img source: Ben Stopford's talkImg source: Ben Stopford's talk
We've needed a tool that would allow us to:
Because:
aren't business logic
Karafka core is combined from few logical parts:
# Gemfile
source 'https://rubygems.org'
gem 'karafka'
bundle install
bundle exec karafka install
Update karafka.rb with your configuration settings and run:
bundle exec karafka server
All the configutation options are described here:
github.com/karafka/karafka
Everything is being validated to ensure you won't do anything stupid
class KarafkaApp < Karafka::App
setup do |config|
config.client_id = 'my_application'
config.backend = :inline
config.batch_fetching = true
config.batch_consuming = true
config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
config.shutdown_timeout = -1
end
end
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
config.client_id = 'example_app'
end
end
Routing engine
It provides an interface to describe how messages from all the topics should be fetched and how they should be consumed.
KarafkaApp.consumer_groups.draw do
consumer_group :group_name do
topic(:videos_created) { consumer VideosConsumer }
topic :binary_video_details do
consumer BinaryConsumer
parser Parsers::BinaryToJson
responder BinaryVideoProcessingResponder
batch_consuming true
end
end
end
Not much needed to be done, if you work with JSON data:
KarafkaApp.consumer_groups.draw do
consumer_group :commit_builds do
topic(:builds_created) { consumer CreateBuildsConsumer }
topic(:builds_done) { consumer UpdateBuildConsumer }
topic(:builds_expired) { consumer ExpireBuildConsumer }
end
end
class CreateVideosConsumer < Karafka::BaseConsumer
def consume
params_batch.each do |params|
respond_with Video.create!(params[:video])
end
end
end
Responders help preventing bugs when you design a receive-respond applications that handle work on multiple incoming and outgoing topics
class ExampleResponder < ApplicationResponder
topic :users_notified
def respond(user)
respond_to :users_notified, user
end
end
bundle exec karafka [COMMAND]
karafka console
karafka flow
karafka help [COMMAND]
karafka info
karafka install
karafka server
commit_builds_imported =>
- commit_builds_scheduled: (always, one or more)
commit_builds_processed =>
- commit_builds_copied: (always, one or more)
sources_received =>
- commit_builds_imported: (always, exactly once)
repositories_created => (nothing)
repositories_deleted => (nothing)
repositories_updated => (nothing)
repositories_tiers_updated => (nothing)
During a message or a batch lifecycle within Karafka framework, there are two crucial moments:
Depending on your application and/or consumer group settings, Karafka's consumer can consume messages in two modes:
class TweetsConsumer < Karafka::BaseConsumer
def consume
tweets = params_batch
.map { |params| params.fetch('video') }
.map { |params| Tweets::New.call(params)['model'] }
Tweets::Import.call tweets
end
end
class TweetsConsumer < Karafka::BaseConsumer
def consume
Tweets::Create.call(params.fetch('video'))
end
end
Messages inside a batch are lazy parsed upon the first usage
IGNORE_LIMIT = 60 # 1 minute
def consume
params_batch.to_a.each do |unparsed_event|
# Ignore messages that were created long ago
limit = Time.now - IGNORE_LIMIT
next if unparsed_event.create_time < limit
EventReactor.call(unparsed_event.parse!)
end
end
BUFFER_SIZE = 1000
def consume
buffer << params_batch.map { |param| param.fetch('event') }
if buffer.size >= BUFFER_SIZE
data = buffer.shift(BUFFER_SIZE)
Event.import_batch(data)
end
end
def buffer
@buffer ||= []
end
Imagine a transaction that lasts across thousands of requests
Transaction for which you need no new persistence layer until you get all the data you need
buffer = []
customer_id = 1
params_batch.each do |params|
next unless params['customer_id'] == 1
case params.fetch('type')
when 'added_to_cart'
buffer << params
when 'completed_purchase'
Events.import(buffer)
consumer.mark_message_as_processed(params)
buffer = []
end
end
Karafka uses dry-monitor as an instrumentation layer to which you can easily hook up with your own listeners. You can use it to develop your own monitoring and logging systems
key = 'params.params.parse.error'
KarafkaApp.monitor.subscribe key do |event|
puts "Oh no! An error: #{event[:error]}"
end
if KarafkaApp.env.production?
KarafkaApp.monitor.subscribe(AirbrakeListener)
KarafkaApp.monitor.subscribe(DataDogListener)
KarafkaApp.monitor.subscribe(LogstashListener)
else
KarafkaApp.monitor.subscribe(StdoutListener)
end
You can deploy and run Karafka on various platforms using:
Some Kafka cloud providers require topics to be namespaced with a username. This approach is understandable, but at the same time, makes your applications less provider agnostic.
To target that issue, you can create your own topic mapper that will sanitize incoming/outgoing topic names, so your logic won't be bound to those specific versions of topic names.
class KarafkaTopicMapper
def initialize(prefix)
@prefix = prefix
end
def incoming(topic)
topic.to_s.gsub("#{@prefix}.", '')
end
def outgoing(topic)
"#{@prefix}.#{topic}"
end
end
mapper = KarafkaTopicMapper.new('maciej')
mapper.incoming('maciej.my_super_topic') #=> 'my_super_topic'
mapper.outgoing('my_other_topic') #=> 'maciej.my_other_topic'
# Rails create action
def create
@user = User.create(user_params)
if @user.valid?
WaterDrop::AsyncProducer.call(@user, topic: 'users')
end
respond_with @user
end
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!
Because Karafka acts a message transportation layer, it works really great with other modern Ruby solutions including:
Example integration with Trailblazer + Reform
class CommitBuildsCopyController < KarafkaController
def perform
CommitBuilds::Copy.call(
params_batch.map { |params| params.fetch('value') }
)
end
end
Problem: based on some internal events we need to set HTTP events to our customers
What if we could:
In the fall of 2017, Yahoo announced that all their users (3 billion accounts) were hacked. Equifax announced that credit information and personal information for 143 million consumers was exposed. Dropbox announced in 2016 that 68 million user accounts were compromised.
Our approach to compromise prevention is simple: fully understand the behavior patterns of your user base at every level, and use that understanding to stop any unauthorized entities accessing accounts in real time.
While hackers might have an easy time obtaining credentials to log into a user account, mimicking all behaviors of a stolen user account at all points in an application is virtually impossible, for even the most experienced hacker.
Device level signals:
IP, browser, user agent, language, device type, screen resolution, battery level, plugins
UI interactions:
Mouse movements, keystrokes, touch interactions, site navigation, user details and attributes
We use all these signals to create a holistic devices fingerprints in real time, every time the user interacts with the application.
The fingerprints serve as building blocks for our behavior models, and anytime there is a single change to the fingerprint our dynamic risk scores are updated accordingly in real time via asynchronous call.
We're currently working on a 1.3 release: