clojure - Kafka's ZookeeperConsumerConnector.createMessageStreams never returns -


i'm trying retrieve data kafka 0.8.1 cluster. have brought existence instance of zookeeperconsumerconnector , attempt call createmessagestreams on it. however, no matter do, seems createmessagestreams hangs , never returns, if thing have done kafka.

reading mailing lists seems can happen few reasons, far can tell haven't done of things.

further, i'll point out i'm doing in clojure using clj-kafka, suspect clj-kafka not issue because have problem if run code:

(.createmessagestreams   (clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181"                                    "group.id" "my.consumer"                                    "auto.offset.reset" "smallest"                                    "auto.commit.enable" "false"})   {"mytopic" (int 1)}) 

and clj-kafka.consumer.zk/consumer uses consumer.createjavaconsumerconnector create zookeeperconsumerconnector without doing fancy.

also, there messages in "mytopic" because command line can run following , i've sent topic:

% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning 

so it's not topic empty.

feeling stumped @ point. ideas?

eta: "hang" guess mean seems spin thread , stay stuck in never doing anything. if run code repl can out of hitting control-c , error:

illegalmonitorstateexception   java.util.concurrent.locks.reentrantlock$sync.tryrelease (reentrantlock.java:155) 

i experiencing same issue same exception when interrupting repl. reason hangs due lazy-iterate function in consumer.zk namespace. queue messages read linkedblockingqueue , call .hasnext in lazy-iterate function calls .take on queue. creates read lock on queue , block , wait until available take off queue. means lazy-iterate function never return. lazy-iterate called 'messages' function , if don't like

(take 2 (messages "mytopic" some-consumer)) 

then messages function never return , hang indefinitely. it's opinion bug (or design flaw) in clj-kafka. illustrate indeed what's happening, try setting "consumer.timeout.ms" "0" in consumer config. throw timeoutexpection , return control repl.

this further creates problem 'with-resource' macro. macro takes binding consumer, shutdown function, , body; calls body , shutdown fn. if inside body, make call 'messages', body never return , shutdown function never called. messages function return if shutdown called, because shutdown puts message on queue signals consumer clean resources , threads in preparation gc. macro puts application state way out of main loop kill application (or thread calling it) itself. library has ways go before it's ready production environment.


Comments

Popular posts from this blog

How to access named pipes using JavaScript in Firefox add-on? -

multithreading - OPAL (Open Phone Abstraction Library) Transport not terminated when reattaching thread? -

node.js - req param returns an empty array -