向 HStreamDB 中的 Stream 写入 Records
本文档提供了关于如何通过 hstreamdb-java 等客户端向 HStreamDB 中的 Stream 写入数据的相关教程。
同时还可参考其他的相关教程:
为了向 HStreamDB 写数据,我们需要将消息打包成 HStream Record,以及一个创建和发送 消息到服务器的 Producer。
HStream Record
Stream 中的所有数据都是以 HStream Record 的形式存在,HStreamDB 支持以下两种 HStream Record:
- HRecord: 可以看作是一段 JSON 数据,就像一些 NoSQL 数据库中的 document。
- Raw Record: 二进制数据。
端到端压缩
为了降低传输开销,最大化带宽利用率,HStreamDB 支持对写入的 HStream Record 进行压缩。 用户在创建 BufferedProducer
时可以设置压缩算法。当前可选的压缩算法有 gzip
和 zstd
。客户端从 HStreamDB 中消费数据时会自动完成解压缩操作。
写入 HStream Record
有两种方法可以把 records 写入 HStreamDB。从简单易用的角度,你可以从 client.newProducer()
的Producer
入手。这个 Producer
没有提供任何配置项,它 只会即刻将收到的每个 record 并行发送到 HServer,这意味着它并不能保证这些 records 的顺序。在生产环境中, client.newBufferedProducer()
中的 BufferedProducer
将 是更好的选择,BufferedProducer
将按顺序缓存打包 records 成一个 batch,并将该 batch 发送到服务器。每一条 record 被写入 stream 时,HServer 将为该 record 生成一 个相应的 record ID,并将其发回给客户端。这个 record ID 在 stream 中是唯一的。
使用 Producer
package docs.code.examples;
import io.hstream.*;
import io.hstream.Record;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class WriteDataSimpleExample {
public static void main(String[] args) throws Exception {
// TODO (developers): Replace these variables for your own use cases.
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName1 = "your_h_records_stream_name";
String streamName2 = "your_raw_records_stream_name";
// We do not recommend write both raw data and HRecord data into the same stream.
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
writeHRecordData(client, streamName1);
writeRawData(client, streamName2);
client.close();
}
public static void writeHRecordData(HStreamClient client, String streamName) {
// Create a basic producer for low latency scenarios
// For high throughput scenarios, please see the next section "Using `BufferedProducer`s"
Producer producer = client.newProducer().stream(streamName).build();
HRecord hRecord =
HRecord.newBuilder()
// Number
.put("id", 10)
// Boolean
.put("isReady", true)
// List
.put("targets", HArray.newBuilder().add(1).add(2).add(3).build())
// String
.put("name", "hRecord-example")
.build();
for (int i = 0; i <= 10; i++) {
Record record = Record.newBuilder().hRecord(hRecord).build();
// If the data is written successfully, returns a server-assigned record id
CompletableFuture<String> recordId = producer.write(record);
System.out.println("Wrote message ID: " + recordId.join());
}
}
private static void writeRawData(HStreamClient client, String streamName) {
Producer producer = client.newProducer().stream(streamName).build();
List<String> messages = Arrays.asList("first", "second");
for (final String message : messages) {
Record record =
Record.newBuilder().rawRecord(message.getBytes(StandardCharsets.UTF_8)).build();
CompletableFuture<String> recordId = producer.write(record);
System.out.println("Published message ID: " + recordId.join());
}
}
}
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
"log"
)
func ExampleWriteProducer() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
producer, err := client.NewProducer("testStream")
if err != nil {
log.Fatalf("Creating producer error: %s", err)
}
defer producer.Stop()
payload := []byte("Hello HStreamDB")
rawRecord, err := Record.NewHStreamRawRecord("testStream", payload)
if err != nil {
log.Fatalf("Creating raw record error: %s", err)
}
for i := 0; i < 100; i++ {
appendRes := producer.Append(rawRecord)
if resp, err := appendRes.Ready(); err != nil {
log.Printf("Append error: %s", err)
} else {
log.Printf("Append response: %s", resp)
}
}
return nil
}
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def append_records(client):
payloads = [b"some_raw_binary_bytes", {"msg": "hi"}]
rs = await client.append(stream_name, payloads)
for r in rs:
print("Append done, ", r)
使用 BufferedProducer
在几乎所有情况下,我们更推荐使用 BufferedProducer
。不仅因为它能提供更大的吞吐 量,它还提供了更加灵活的配置去调整,用户可以根据需求去在吞吐量和时延之间做出调整 。你可以配置 BufferedProducer
的以下两个设置来控制和设置触发器和缓存区大小。通 过 BatchSetting
,你可以根据 batch 的最大 record 数、batch 的总字节数和 batch 存在的最大时限来决定何时发送。通过配置 FlowControlSetting
,你可以为所有的缓存 的 records 设置缓存大小和策略。下面的代码示例展示了如何使用 BatchSetting 来设置 响应的 trigger,以通知 producers 何时应该刷新,以及 FlowControlSetting
来限制 BufferedProducer
中的 buffer 的最大字节数。
package docs.code.examples;
import io.hstream.*;
import io.hstream.Record;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class WriteDataBufferedExample {
public static void main(String[] args) throws Exception {
// TODO (developers): Replace these variables for your own use cases.
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName = "your_h_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
writeHRecordDataWithBufferedProducers(client, streamName);
client.close();
}
public static void writeHRecordDataWithBufferedProducers(
HStreamClient client, String streamName) {
BatchSetting batchSetting =
BatchSetting.newBuilder()
// optional, default: 100, the max records count of a batch,
// disable the trigger if the value <= 0.
.recordCountLimit(100)
// optional, default: 4096(4KB), the max bytes size of a batch,
// disable the trigger if the value <= 0.
.bytesLimit(4096)
// optional, default: 100(ms), the max age of a buffering batch,
// disable the trigger if the value <= 0.
.ageLimit(100)
.build();
// FlowControlSetting is to control total records,
// including buffered batch records and sending records
FlowControlSetting flowControlSetting =
FlowControlSetting.newBuilder()
// Optional, the default: 104857600(100MB), total bytes limit, including buffered batch
// records and
// sending records, the value must be greater than batchSetting.bytesLimit
.bytesLimit(40960)
.build();
BufferedProducer producer =
client.newBufferedProducer().stream(streamName)
.batchSetting(batchSetting)
.flowControlSetting(flowControlSetting)
.build();
List<CompletableFuture<String>> recordIds = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
double temp = random.nextInt(100) / 10.0 + 15;
HRecord hRecord = HRecord.newBuilder().put("temperature", temp).build();
Record record = Record.newBuilder().hRecord(hRecord).build();
CompletableFuture<String> recordId = producer.write(record);
recordIds.add(recordId);
}
// close a producer, it will call flush() first
producer.close();
for (CompletableFuture<String> recordId : recordIds) {
System.out.println("Wrote message ID: " + recordId.join());
}
}
}
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
"log"
)
func ExampleWriteBatchProducer() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
producer, err := client.NewBatchProducer("testDefaultStream", hstream.WithBatch(10, 500))
if err != nil {
log.Fatalf("Creating producer error: %s", err)
}
defer producer.Stop()
result := make([]hstream.AppendResult, 0, 100)
for i := 0; i < 100; i++ {
rawRecord, _ := Record.NewHStreamHRecord("", map[string]interface{}{
"id": i,
"isReady": true,
"name": "hRecord-example",
})
r := producer.Append(rawRecord)
result = append(result, r)
}
for i, res := range result {
resp, err := res.Ready()
if err != nil {
log.Printf("write error: %s\n", err.Error())
}
log.Printf("record[%d]=%s\n", i, resp.String())
}
return nil
}
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
class AppendCallback(hstreamdb.BufferedProducer.AppendCallback):
count = 0
def on_success(self, stream_name, payloads, stream_keyid):
self.count += 1
print(
f"Batch {self.count}: Append success with {len(payloads)} payloads."
)
def on_fail(self, stream_name, payloads, stream_keyid, e):
print("Append failed!")
print(e)
async def buffered_append_records(client):
p = client.new_producer(
append_callback=AppendCallback(),
size_trigger=10240,
time_trigger=0.5,
retry_count=2,
)
for i in range(50):
await p.append(stream_name, b"some_raw_binary_bytes")
await p.append(stream_name, {"msg": "hello"})
await p.wait_and_close()
使用分区键(Partition Key)
具有相同分区键的 records 可以在 BufferedProducer 中被保证能有序地写入。HStreamDB 的另一个重要功能,分区,也使用这些分区键来决定 records 将被分配到哪个分区, 以此提高写/读性能。更详细的解释请看管理 Stream 的分区。
参考下面的例子,你可以很容易地写入带有分区键的 records。
package docs.code.examples;
import io.hstream.*;
import io.hstream.Record;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class WriteDataWithKeyExample {
public static void main(String[] args) throws Exception {
// TODO (developers): Replace these variables for your own use cases.
String serviceUrl = "hstream://127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName = "your_h_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
writeHRecordDataWithKey(client, streamName);
client.close();
}
public static void writeHRecordDataWithKey(HStreamClient client, String streamName) {
// For demonstrations, we would use the following as our partition keys for the records.
// As the documentations mentioned, if we do not give any partition key, it will get a default
// key and be mapped to some default shard.
String key1 = "South";
String key2 = "North";
// Create a buffered producer with default BatchSetting and FlowControlSetting.
BufferedProducer producer = client.newBufferedProducer().stream(streamName).build();
List<CompletableFuture<String>> recordIds = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
double temp = random.nextInt(100) / 10.0 + 15;
Record record;
if ((i % 3) == 0) {
HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 1).build();
record = Record.newBuilder().hRecord(hRecord).partitionKey(key1).build();
} else {
HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 2).build();
record = Record.newBuilder().hRecord(hRecord).partitionKey(key2).build();
}
CompletableFuture<String> recordId = producer.write(record);
recordIds.add(recordId);
}
producer.close();
for (CompletableFuture<String> recordId : recordIds) {
System.out.println("Wrote message ID: " + recordId.join());
}
}
}
package examples
import (
"fmt"
"github.com/hstreamdb/hstreamdb-go/hstream"
"github.com/hstreamdb/hstreamdb-go/hstream/Record"
"github.com/hstreamdb/hstreamdb-go/hstream/compression"
"log"
"math/rand"
"sync"
)
func ExampleWriteBatchProducerMultiKey() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
producer, err := client.NewBatchProducer("testStream",
// optional: set record count and max batch bytes trigger
hstream.WithBatch(10, 500),
// optional: set timeout trigger
hstream.TimeOut(1000),
// optional: set client compression
hstream.WithCompression(compression.Zstd),
// optional: set flow control
hstream.WithFlowControl(81920000))
if err != nil {
log.Fatalf("Creating producer error: %s", err)
}
defer producer.Stop()
keys := []string{"sensor1", "sensor2", "sensor3", "sensor4", "sensor5"}
rids := sync.Map{}
wg := sync.WaitGroup{}
wg.Add(5)
for _, key := range keys {
go func(key string) {
result := make([]hstream.AppendResult, 0, 100)
for i := 0; i < 100; i++ {
temp := rand.Intn(100)/10.0 + 15
rawRecord, _ := Record.NewHStreamHRecord(key, map[string]interface{}{
key: fmt.Sprintf("temperature=%d", temp),
})
r := producer.Append(rawRecord)
result = append(result, r)
}
rids.Store(key, result)
wg.Done()
}(key)
}
wg.Wait()
rids.Range(func(key, value interface{}) bool {
k := key.(string)
res := value.([]hstream.AppendResult)
for i := 0; i < 100; i++ {
resp, err := res[i].Ready()
if err != nil {
log.Printf("write error: %s\n", err.Error())
}
log.Printf("[key: %s]: record[%d]=%s\n", k, i, resp.String())
}
return true
})
return nil
}