Chronicle Queue入门

作者丨Shaolang Ai
译者 | 杨晓娟
用Chronicle Queue构建的应用程序不会让生产者放慢将消息放入队列的速度(没有背压机制)。


Hello, World!
import?org.jetbrains.kotlin.gradle.tasks.KotlinCompile??//?line?1
?
plugins?{
???id("org.jetbrains.kotlin.jvm")?version?"1.3.71"
??application
}
?
repositories?{
??mavenCentral()
??mavenLocal()
}
?
dependencies?{
???implementation("org.jetbrains.kotlin:kotlin-bom")
???implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("net.openhft.chronicle:chronicle-queue:5.19.8")??//?line?17
?
implementation("org.apache.logging.log4j:log4j-sl4fj18-impl:2.13.1")
}
application?{
??mainClass?=??"hello.AppKt"
}
?
tasks.withType<KotlinCompile>?{??????????//?line?25
?????kotlinOptions.jvmTarget?=?"1.8"
}
左右滑动查看完整代码
package?hello
?
import?net.openhft.chronicle.queue.ChronicleQueue
?
fun?main(args:?Array<String>)?{
??val?q:??ChronicleQueue?=?ChronicleQueue.single("./build/hello-world")
?
??try?{
????val?appender:??ExcerptAppender?=?q.acquireAppender()
?????appender.writeText("Hello,?World!")
?????
????val?tailer:??ExcerptTailer?=?q.createTailer()
?????println(tailer.readText())
??}?finally?{
???q.close()
??}
}

便道:摘录类型
Serializable对象:请注意,由于依赖于反射,序列化类对象的效率很低 Externalizable对象:如果与Java的兼容性很重要,但以牺牲手写逻辑为代价 net.openhft.chronicle.wire.Marshallable对象:使用二进制格式的高性能数据交换 net.openhft.chronicle.bytes.BytesMarshallable对象:底层二进制或文本编码
package?types
?
import?net.openhft.chronicle.wire.Marshallable
import??net.openhft.chronicle.wire.SelfDescribingMarshallable
?
class?Person(val?name:?String,?val?age:?Int):??SelfDescribingMarshallable()
?
fun?main(args:?Array<String>)?{
????val?person?=?Person("Shaolang",?3)
????val?outputString?=?"""
????!types.Person?{
??????name:?Shaolang
??????age:?3
????}
????""".trimIndent()
?
????println(person.toString()?==?outputString)
?
????val?p?=?Marshallable.fromString<Person>(outputString)
????println(person?==?p)
????println(person.hashCode()?==?p.hashCode())
}

写入和读取域对象
package?docs
?
import?net.openhft.chronicle.queue.ChronicleQueue
import??net.openhft.chronicle.wire.SelfDescribingMarshallable
?
class?Person(var?name:?String??=?null,?var?age:?Int??=??null):?SelfDescribingMarshallable()
?
class?Food(var?name:?String??=?null):??SelfDescribingMarshallable()
?
fun?main(args:?Array<String>)?{
????ChronicleQueue.single("./build/documents").use??{?q?->
????????val?appender?=??q.acquireAppender()?????????????????????????????????????????????????????
?????????appender.writeDocument(Person("Shaolang",?3))
????????appender.writeText("Hello,??World!")
????????appender.writeDocument(Food("Burger"))
?????????????????????????????????????????????????????
????????val?tailer?=?q.createTailer()
?????????????????????????????????????????????????????
????????val?person?=?Person()????????????????????????????????????????????????????
????????tailer.readDocument(person)
????????println(person)
?????????println("${tailer.readText()}\n")
?????????????????????????????????????????????????????
????????val?food?=?Food()
????????tailer.readDocument(food)
????????println(food)
????}
}
!docs.Person?{
??name:?Shaolang,
??age:?3
}
Hello,?World!
!docs.Food?{
??name:?Burger,
}由于Chronicle Queue的目标是不产生垃圾,因而要求域模型是可变对象;这就是为什么两个类在构造器中使用var而不是val。 Chronicle ? ? Queue允许appender将不同的内容写入同一队列。 tailer需要知道它应该读什么才能得到正确的结果。
!docs.Person?{
??name:?Burger,
??age:?!!null?""
}

只听感兴趣的东西
package?listener
?
import?net.openhft.chronicle.queue.ChronicleQueue
import?net.openhft.chronicle.queue.ChronicleReaderMain
import??net.openhft.chronicle.wire.SelfDescribingMarshallable
?
class?Person(var?name:?String??=?null,?var?age:?Int??=??null):?SelfDescribingMarshallable()
?
interface?PersonListener?{
????fun?onPerson(person:?Person)
}
?
fun?main(args:?Array<String>)?{
????val?directory?=?"./build/listener"
?
?????ChronicleQueue.single(directory).use?{?q?->
????????val??observable:?PersonListener?=?q.acquireAppender()
??????????????.methodWriter(PersonListener::class.java)
?????????observable.onPerson(Person("Shaolang",?3))
?????????observable.onPerson(Person("Elliot",?4))
????}
?
??????ChronicleReaderMain.main(arrayOf("-d",?directory))
}
0x47c900000000:
onPerson?{
??name:?Shaolang,
??age:?3
}
0x47c900000001:
onPerson?{
??name:?Elliot,
??age:?4
}
interface?PersonListener?{
????onPerson(person:?Person)
}
//?this?is?an?observer?because?it?implements?the?listener??interface
class?PersonRegistry:?PersonListener?{
????override?fun?onPerson(person:?Person)?{
????????//?code?omitted?for?brevity
????}
}
fun?main(args:?Array<String>)?{
????//?code?omitted?for?brevity
????val??observable:?PersonListener?=?q.acquireAppender()????//?this?is?an
??????????????.methodWriter(PersonListener::class.java)???????//??observable
????//??another?way?to?differentiate:?the?observer?will?never?call?the
????//??listener?method,?only?observables?do.
?????observable.onPerson(Person("Shaolang",?3))
????//?code?omitted?for?brevity
}
package?listener
?
import?net.openhft.chronicle.bytes.MethodReader
import?net.openhft.chronicle.queue.ChronicleQueue
import??net.openhft.chronicle.wire.SelfDescribingMarshallable
?
class?Person(var?name:?String??=?null,?var?age:?Int??=??null):?SelfDescribingMarshallable()
?
class?Food(var?name:?String??=?null):?SelfDescribingMarshallable()
?
interface?PersonListener?{
????fun?onPerson(person:?Person)
}
?
class?PersonRegistry:?PersonListener?{
????override?fun?onPerson(person:?Person)?{
????????println("in?registry:??${person.name}")
????}
}
?
fun?main(args:?Array<String>)?{
?????ChronicleQueue.single("./build/listener2").use?{?q?->
????????val?appender?=??q.acquireAppender()
????????val?writer:?PersonListener?=??appender.methodWriter(PersonListener::class.java)
?????????writer.onPerson(Person("Shaolang",?3))
?????????appender.writeDocument(Food("Burger"))
?????????writer.onPerson(Person("Elliot",?4))
?????????????????????????????????????????????????????
????????val?registry:?PersonRegistry?=??PersonRegistry()
????????val?reader:??MethodReader?=?q.createTailer().methodReader(registry)
????????reader.readOne()
????????reader.readOne()
????????reader.readOne()
????}
}

MethodReader使用鸭子类型
package?duck
?
import?net.openhft.chronicle.queue.ChronicleQueue
import??net.openhft.chronicle.wire.SelfDescribingMarshallable
?
class?Person(var?name:?String??=?null,?var?age:?Int??=??null):?SelfDescribingMarshallabl()
?
interface?PersonListener?{
????fun?onPerson(person:?Person)
}
?
interface?VIPListener?{
????fun?onPerson(person:?Person)
}
?
class?VIPClub:?VIPListener?{
????override?fun??onPerson(person:?Person)?{
????????println("Welcome?to?the??club,?${person.name}!")
????}
}
?
fun?main(args:?Array<String>)?{
????ChronicleQueue.single("./build/duck").use??{?q?->
????????val?writer?=??q.acquireAppender().methodWriter(PersonListener::class.java)
?????????writer.onPerson(Person("Shaolang",?3))
????????????????????????????????????????????????
????????val?club?=?VIPClub()
????????val?reader?=??q.createTailer().methodReader(club)
????????reader.readOne()
????}
}

package?restartable
?
import?net.openhft.chronicle.queue.ChronicleQueue
import?net.openhft.chronicle.queue.ExcerptTailer
?
fun?readQueue(tailerName:?String,?times:?Int)?{
?????ChronicleQueue.single("./build/restartable").use?{?q?->
?????????val?tailer?=?q.createTailer(tailerName)???????//?tailer??name?given
?????????for?(_n?in?1..times)?{
???????????println("$tailerName:?${tailer.readText()}")
????????}
?
????????println()????????//?to?separate?outputs?for?easier?visualization
????}
}
?
fun?main(args:?Array<String>)?{
?????ChronicleQueue.single("./build/restartable").use?{?q?->
????????val?appender?=??q.acquireAppender()
????????appender.writeText("Test??Message?1")
????????appender.writeText("Test??Message?2")
????????appender.writeText("Test?Message??3")
????????appender.writeText("Test??Message?4")
}
?
????readQueue("foo",?1)
????readQueue("bar",?2)
????readQueue("foo",?3)
????readQueue("bar",?1)
}
foo:?Test?Message?1
bar:?Test?Message?1
bar:?Test?Message?2
foo:?Test?Message?2
foo:?Test?Message?3
foo:?Test?Message?4
bar:?Test?Message?3?
滚动文件
package?roll
?
import?net.openhft.chronicle.queue.ChronicleQueue
import?net.openhft.chronicle.queue.RollCycles
import?net.openhft.chronicle.impl.single.SingleChronicleQueueBuilder
?
fun?main(args:?Array<String>)?{
????var??qbuilder:?SingleChronicleQueueBuilder?=??ChronicleQueue.singleBuilder("./build/roll")
?????qbuilder.rollCycle(RollCycles.HOURLY)
????val?q:??ChronicleQueue?=?qbuilder.build()
????//?code??omitted?for?brevity
}val?q:?ChronicleQueue?=?ChronicleQueue.singleBuilder("./build/roll")
??????????????????????????????????????.rollCycle(RollCycles.HOURLY)
??????????????????????????????????????.build()

Chronicle Queue GitHub repository Stack Overflow tagged questions Peter Lawre's Blog

关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/







51CTO技术栈
关注网络尖刀微信公众号
