NAME Kafka::Consumer::Avro - Avro message consumer for Apache Kafka. SYNOPSIS use Kafka qw/DEFAULT_MAX_BYTES/; use Kafka::Connection; use Kafka::Consumer::Avro; use Confluent::SchemaRegistry; my $connection = Kafka::Connection->new( host => 'localhost' ); my $consumer = Kafka::Consumer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() ); # Consuming messages my $messages = $consumer->fetch( 'mytopic', # topic 0, # partition 0, # offset $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive ); if ($messages) { foreach my $message (@$messages) { if ( $message->valid ) { say 'payload : ', $message->payload; say 'key : ', $message->key; say 'offset : ', $message->offset; say 'next_offset: ', $message->next_offset; } else { say 'error : ', $message->error; } } } # Closes the consumer and cleans up undef $consumer; $connection->close; undef $connection; DESCRIPTION "Kafka::Consumer::Avro" main feature is to provide object-oriented API to consume messages according to *Confluent SchemaRegistry* and *Avro* serialization. "Kafka::Consumer::Avro" inerhits from and extends Kafka::Consumer. INSTALL Installation of "Kafka::Consumer::Avro" is a canonical: perl Makefile.PL make make test make install TEST NOTES Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend "Kafka::Consumer" test suite. They expect that in the target machine are available Kafka and Schema Registry listening on "localhost" and default ports, otherwise most of the test are skipped. USAGE CONSTRUCTOR "new" Creates new consumer client object. "new()" takes arguments in key-value pairs as described in Kafka::Consumer from which it inherits. In addition, takes in the following arguments: "SchemaRegistry => $schema_registry" (mandatory) Is a Confluent::SchemaRegistry instance. METHODS The following methods are defined for the "Kafka::Avro::Consumer" class: "schema_registry"() Returns the Confluent::SchemaRegistry instance supplied to the construcor. "get_error"() Returns a string containing last error message. "fetch( %params )" Gets messages froma a Kafka topic. Please, see Kafka::Consumer for more details. AUTHOR Alvaro Livraghi, CONTRIBUTE BUGS Please use GitHub project link above to report problems or contact authors. COPYRIGHT AND LICENSE Copyright 2018 by Alvaro Livraghi This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.