The point of this talk is not for me to reach the end of of it.
The point is for me to teach you something.
So if I lose you at any point, interrupt me immediately.
I love questions.
Maciej Mensfeld
Every good work of software starts by scratching a developer's personal itch.Eric Raymond, The Cathedral and the Bazaar
The same applies to Ruby Gems and other libraries.
*tests not included, just code and comments
*tests not included, just code and comments
*tests not included, just code and comments
*tests not included, just code and comments
::Karafka::EVENTS.each do |event_name|
::Karafka.monitor.subscribe(event_name) do |event|
# Align with ActiveSupport::Notifications naming convention
event = (event_name.split('.').reverse << 'karafka').join('.')
# Instrument via ActiveSupport
::ActiveSupport::Notifications.instrument(
event_name,
**event.payload
)
end
end
ActiveSupport::Notifications.subscribe(
'consumed.consumer.karafka'
) do |event|
Rails.logger.info "[consumer.consumed]: #{event.inspect}"
end
83% Reduction
setting :processing do
setting :jobs_queue_class, default: Processing::JobsQueue
setting :scheduler_class, default: Processing::Schedulers::Default
setting :jobs_builder, default: Processing::JobsBuilder.new
setting :coordinator_class, default: Processing::Coordinator
setting :partitioner_class, default: Processing::Partitioner
setting :strategy_selector, default: Processing::StrategySelector.new
setting :expansions_selector, default: Processing::ExpansionsSelector.new
setting :executor_class, default: Processing::Executor
end
icfg.processing.coordinator_class = Pro::Processing::Coordinator
icfg.processing.partitioner_class = Pro::Processing::Partitioner
icfg.processing.scheduler_class = Pro::Processing::Schedulers::Default
icfg.processing.jobs_queue_class = Pro::Processing::JobsQueue
icfg.processing.executor_class = Pro::Processing::Executor
icfg.processing.jobs_builder = Pro::Processing::JobsBuilder.new
icfg.processing.strategy_selector = Pro::Processing::StrategySelector.new
icfg.processing.expansions_selector = Pro::Processing::ExpansionsSelector.new
def dead_letter_queue(
max_retries: 3,
topic: nil
)
@dead_letter_queue ||= Config.new(
active: !topic.nil?,
max_retries: max_retries,
topic: topic
)
end
def dead_letter_queue(
max_retries: 3,
topic: nil,
independent: false,
transactional: true
)
@dead_letter_queue ||= Config.new(
active: !topic.nil?,
max_retries: max_retries,
topic: topic,
independent: independent,
transactional: transactional
)
end
# @api public
# @since 2.0.0
def app_name
slice_name
end
# @return [Hanami::SliceName]
#
# @api private
# @since 2.0.0
attr_reader :app_name
def _protected_ivars
PROTECTED_IVARS
end
private :_protected_ivars
def _extract_redirect_to_status(options, response_options)
if options.is_a?(Hash) && options.key?(:status)
Rack::Utils.status_code(options.delete(:status))
elsif response_options.key?(:status)
Rack::Utils.status_code(response_options[:status])
else
302
end
end
Addy Osmani / https://read-dx.addy.ie/chapter-good-dx
Gem | Framework |
---|---|
Sidekiq | Custom |
ActiveAdmin | Rails |
Resque | Sinatra |
Karafka | Roda |
MailCatcher | Sinatra |
Blazer | Rails |
Ahoy (ahoy_matey) | Rails |
Maintaining data on the user side is tough
{
"schema_version": "1.2.2",
"type": "consumer",
"dispatched_at": 2690883271.5755131,
"process": {
"started_at": 2690818651.8229299,
"name": "shinra:1:1",
"status": "running",
"listeners": {
"active": 2,
"standby": 0
}
}
class Inflector < Zeitwerk::GemInflector
MIGRATION_ABSPATH_REGEXP = /migrations\/[0-9]+_(.*)/
MIGRATION_BASENAME_REGEXP = /\A[0-9]+_(.*)/
def camelize(basename, abspath)
return super unless abspath.match?(MIGRATION_ABSPATH_REGEXP)
return super unless basename.match?(MIGRATION_BASENAME_REGEXP)
super(
basename.match(MIGRATION_BASENAME_REGEXP).to_a.last,
abspath
)
end
end
loader.inflector = Inflector.new("#{root}/karafka/web.rb")
# migrations/1700234522_introduce_waiting_in_consumers_metrics.rb
class IntroduceWaitingInConsumersMetrics < Base
self.versions_until = '1.1.1'
self.type = :consumers_metrics
def migrate(state)
state[:aggregated].each_value do |metrics|
metrics.each do |metric|
metric.last[:waiting] = 0
end
end
end
end
class Consumer < Karafka::BaseConsumer
def consume
messages.each do |message|
DT[0] << message.raw_payload
end
end
end
elements = DT.uuids(100)
produce_many(DT.topic, elements)
Karafka::Server.run
assert_equal elements, DT[0]
Rails / Ruby | Ruby 2.7 | Ruby 3.0 | Ruby 3.1 | Ruby 3.2 | Ruby 3.3 |
---|---|---|---|---|---|
Rails 6.0 | 6.0 - 2.7 | 6.0 - 3.0 | 6.0 - 3.1 | 6.0 - 3.2 | 6.0 - 3.3 |
Rails 6.1 | 6.1 - 2.7 | 6.1 - 3.0 | 6.1 - 3.1 | 6.1 - 3.2 | 6.1 - 3.3 |
Rails 7.0 | 7.0 - 2.7 | 7.0 - 3.0 | 7.0 - 3.1 | 7.0 - 3.2 | 7.0 - 3.3 |
Rails 7.1 | 7.1 - 2.7 | 7.1 - 3.0 | 7.1 - 3.1 | 7.1 - 3.2 | 7.1 - 3.3 |
For more details, find me on Twitter:
⭐ github.com/karafka/karafka 🙏