Skip to content
Open in Gitpod

创建和管理 Subscription

Subscription 的属性

  • ackTimeoutSeconds

    指定 HServer 将 records 标记为 unacked 的最大等待时间,之后该记录将被再次发送。

  • maxUnackedRecords。

    允许的未 acked record 的最大数量。超过设定的大小后,服务器将停止向相应的消费者 发送 records。

创建一个 Subscription

每个 subscription 都必须指定要订阅哪个 stream,这意味着你必须确保要订阅的 stream 已经被创建。

关于订阅的名称,请参考资源命名准则

当创建一个 subscription 时,你可以像这样提供提到的属性:

java
package docs.code.examples;

import io.hstream.HStreamClient;
import io.hstream.Subscription;

public class CreateSubscriptionExample {
  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }
    String streamName = "your_h_records_stream_name";
    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    createSubscriptionExample(client, streamName);
    client.close();
  }

  public static void createSubscriptionExample(HStreamClient client, String streamName) {
    // TODO(developer): Specify the options while creating the subscription
    String subscriptionId = "your_subscription_id";
    Subscription subscription =
        Subscription.newBuilder().subscription(subscriptionId).stream(streamName)
            // The default setting is 600 seconds
            .ackTimeoutSeconds(600)
            // The default setting is 10000 records
            .maxUnackedRecords(10000)
            // Set Subscription offset to EARLIEST, default setting is LATEST
            .offset(Subscription.SubscriptionOffset.EARLIEST)
            .build();
    client.createSubscription(subscription);
  }
}
go
package examples

import (
	"github.com/hstreamdb/hstreamdb-go/hstream"
	"log"
)

func ExampleCreateSubscription() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	streamName := "testStream"
	subId0 := "SubscriptionId0"
	subId1 := "SubscriptionId1"

	// Create a new subscription with ack timeout = 60s, max unAcked records num set to 10000 and set
	// subscriptionOffset to Earliest
	if err = client.CreateSubscription(subId0, streamName,
		hstream.WithAckTimeout(60),
		hstream.WithMaxUnackedRecords(10000),
		hstream.WithOffset(hstream.EARLIEST)); err != nil {
		log.Fatalf("Creating subscription error: %s", err)
	}

	if err = client.CreateSubscription(subId1, streamName,
		hstream.WithAckTimeout(600),
		hstream.WithMaxUnackedRecords(5000),
		hstream.WithOffset(hstream.LATEST)); err != nil {
		log.Fatalf("Creating subscription error: %s", err)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def create_subscription(client):
    await client.create_subscription(
        subscription,
        stream_name,
        ack_timeout=600,
        max_unacks=10000,
        offset=hstreamdb.SpecialOffset.EARLIEST,
    )

删除一个订阅

要删除一个的订阅,你需要确保没有活跃的订阅消费者,除非启用强制删除。

强制删除一个 Subscription

如果你确实想删除一个 subscription,并且有消费者正在运行,请启用强制删除。当强制 删除一个 subscription 时,该订阅将处于删除中的状态,并关闭正在运行的消费者,这意 味着你将无法加入、删除或创建一个同名的 subscription 。在删除完成后,你可以用同样 的名字创建一个订阅,这个订阅将是一个全新的订阅。即使他们订阅的是同一个流,这个新 的订阅也不会与被删除的订阅共享消费进度。

java
package docs.code.examples;

import io.hstream.HStreamClient;

public class DeleteSubscriptionExample {
  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    // String serviceUrl = "your-service-url-address";
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    String subscriptionId = "your_subscription_id";
    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    deleteSubscriptionExample(client, subscriptionId);
    client.close();
  }

  public static void deleteSubscriptionExample(HStreamClient client, String subscriptionId) {
    client.deleteSubscription(subscriptionId);
  }

  public static void deleteSubscriptionForceExample(HStreamClient client, String subscriptionId) {
    client.deleteSubscription(subscriptionId, true);
  }
}
go
package examples

import (
	"github.com/hstreamdb/hstreamdb-go/hstream"
	"log"
)

func ExampleDeleteSubscription() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	subId0 := "SubscriptionId0"
	subId1 := "SubscriptionId1"

	// force delete subscription
	if err = client.DeleteSubscription(subId0, true); err != nil {
		log.Fatalf("Force deleting subscription error: %s", err)
	}

	// delete subscription
	if err = client.DeleteSubscription(subId1, false); err != nil {
		log.Fatalf("Deleting subscription error: %s", err)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def delete_subscription(client):
    await client.delete_subscription(subscription, force=True)

列出 HStream 中的 subscription 信息

java
package docs.code.examples;

import io.hstream.HStreamClient;
import io.hstream.Subscription;
import java.util.List;

public class ListSubscriptionsExample {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String serviceUrl = "hstream://127.0.0.1:6570";
    if (System.getenv("serviceUrl") != null) {
      serviceUrl = System.getenv("serviceUrl");
    }

    HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    listSubscriptionExample(client);
    client.close();
  }

  public static void listSubscriptionExample(HStreamClient client) {
    List<Subscription> subscriptions = client.listSubscriptions();
    for (Subscription subscription : subscriptions) {
      System.out.println(subscription.getSubscriptionId());
    }
  }
}
go
package examples

import (
	"fmt"
	"github.com/hstreamdb/hstreamdb-go/hstream"
	"log"
)

func ExampleListSubscriptions() error {
	client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
	if err != nil {
		log.Fatalf("Creating client error: %s", err)
	}
	defer client.Close()

	subscriptions, err := client.ListSubscriptions()
	if err != nil {
		log.Fatalf("Listing subscriptions error: %s", err)
	}

	for _, sub := range subscriptions {
		fmt.Printf("%+v\n", sub)
	}

	return nil
}
python
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os

# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"


# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
    async with await hstreamdb.insecure_client(host=host, port=port) as client:
        for f in funcs:
            await f(client)


async def list_subscriptions(client):
    subscriptions = await client.list_subscriptions()
    for s in subscriptions:
        print(s)