clojure - Kafka's ZookeeperConsumerConnector.createMessageStreams never returns -
i'm trying retrieve data kafka 0.8.1 cluster. have brought existence instance of zookeeperconsumerconnector
, attempt call createmessagestreams
on it. however, no matter do, seems createmessagestreams
hangs , never returns, if thing have done kafka.
reading mailing lists seems can happen few reasons, far can tell haven't done of things.
further, i'll point out i'm doing in clojure using clj-kafka, suspect clj-kafka not issue because have problem if run code:
(.createmessagestreams (clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181" "group.id" "my.consumer" "auto.offset.reset" "smallest" "auto.commit.enable" "false"}) {"mytopic" (int 1)})
and clj-kafka.consumer.zk/consumer
uses consumer.createjavaconsumerconnector
create zookeeperconsumerconnector
without doing fancy.
also, there messages in "mytopic" because command line can run following , i've sent topic:
% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning
so it's not topic empty.
feeling stumped @ point. ideas?
eta: "hang" guess mean seems spin thread , stay stuck in never doing anything. if run code repl can out of hitting control-c , error:
illegalmonitorstateexception java.util.concurrent.locks.reentrantlock$sync.tryrelease (reentrantlock.java:155)
i experiencing same issue same exception when interrupting repl. reason hangs due lazy-iterate function in consumer.zk namespace. queue messages read linkedblockingqueue , call .hasnext in lazy-iterate function calls .take on queue. creates read lock on queue , block , wait until available take off queue. means lazy-iterate function never return. lazy-iterate called 'messages' function , if don't like
(take 2 (messages "mytopic" some-consumer))
then messages function never return , hang indefinitely. it's opinion bug (or design flaw) in clj-kafka. illustrate indeed what's happening, try setting "consumer.timeout.ms" "0" in consumer config. throw timeoutexpection , return control repl.
this further creates problem 'with-resource' macro. macro takes binding consumer, shutdown function, , body; calls body , shutdown fn. if inside body, make call 'messages', body never return , shutdown function never called. messages function return if shutdown called, because shutdown puts message on queue signals consumer clean resources , threads in preparation gc. macro puts application state way out of main loop kill application (or thread calling it) itself. library has ways go before it's ready production environment.
Comments
Post a Comment