应用简介
高吞吐量事件流和实时数据摄取。
---
name: azure-eventhub-ts
description: "High-throughput event streaming and real-time data ingestion."
risk: unknown
source: community
date_added: "2026-02-27"
---
# Azure Event Hubs SDK for TypeScript
High-throughput event streaming and real-time data ingestion.
## Installation
```bash
npm install @azure/event-hubs @azure/identity
```
For checkpointing with consumer groups:
```bash
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
```
## Environment Variables
```bash
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
```
## Authentication
```typescript
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();
// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer
const consumer = new EventHubConsumerClient(
"$Default", // Consumer group
fullyQualifiedNamespace,
eventHubName,
credential
);
```
## Core Workflow
### Send Events
```typescript
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();
```
### Send to Specific Partition
```typescript
// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });
// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });
```
### Receive Events (Simple)
```typescript
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Error on partition ${context.partitionId}: ${err.message}`);
},
});
// Stop after some time
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);
```
### Receive with Checkpointing (Production)
```typescript
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Processing: ${JSON.stringify(event.body)}`);
}
// Checkpoint after processing batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Error: ${err.message}`);
},
});
```
### Receive from Specific Position
```typescript
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// Start from beginning
"0": { offset: "@earliest" },
// Start from end (new events only)
"1": { offset: "@latest" },
// Start from specific offset
"2": { offset: "12345" },
// Start from specific time
"3": { enqueuedOn: new Date("2024-01-01") },
},
});
```
## Event Hub Properties
```typescript
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
```
## Batch Processing Options
```typescript
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // Max events per batch
maxWaitTimeInSeconds: 30, // Max wait for batch
}
);
```
## Key Types
```typescript
import {
EventHubProducerClient,
EventHubConsumerClient,
EventData,
ReceivedEventData,
PartitionContext,
Subscription,
SubscriptionEventHandlers,
CreateBatchOptions,
EventPosition,
} from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
```
## Event Properties
```typescript
// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
body: { data: "payload" },
properties: {
eventType: "telemetry",
deviceId: "sensor-1",
},
contentType: "application/json",
correlationId: "request-123",
});
// Access in receiver
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Type: ${event.properties?.eventType}`);
console.log(`Sequence: ${event.sequenceNumber}`);
console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
console.log(`Offset: ${event.offset}`);
}
},
});
```
## Error Handling
```typescript
consumer.subscribe({
processEvents: async (events, context) => {
try {
for (const event of events) {
await processEvent(event);
}
await context.updateCheckpoint(events[events.length - 1]);
} catch (error) {
// Don't checkpoint on error - events will be reprocessed
console.error("Processing failed:", error);
}
},
processError: async (err, context) => {
if (err.name === "MessagingError") {
// Transient error - SDK will retry
console.warn("Transient error:", err.message);
} else {
// Fatal error
console.error("Fatal error:", err);
}
},
});
```
## Best Practices
1. **Use checkpointing** - Always checkpoint in production for exactly-once processing
2. **Batch sends** - Use `createBatch()` for efficient sending
3. **Partition keys** - Use partition keys to ensure ordering for related events
4. **Consumer groups** - Use separate consumer groups for different processing pipelines
5. **Handle errors gracefully** - Don't checkpoint on processing failures
6. **Close clients** - Always close producer/consumer when done
7. **Monitor lag** - Track `lastEnqueuedSequenceNumber` vs processed sequence
## When to Use
This skill is applicable to execute the workflow or actions described in the overview.
## Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.
发布日期
5/16/2026
提供方
SkillOPIC
来源类型
导入
sickn33
coding
数据安全
使用 Skill 时,您的对话内容将被发送至 AI 模型进行处理。我们会严格保护您的隐私数据,不会将您的对话内容用于模型训练或分享给第三方。 以下为此 Skill 的数据处理说明。
此 Skill 将处理您的对话输入
您的消息将作为 Prompt 上下文发送至 AI 模型
所有通信均通过加密通道传输
对话记录仅保存在本地
您可以随时清除本地对话历史,清除后数据不可恢复
评分和评价
已验证评分
Skill 信息
了解此 Skill 的详细信息和功能特性
编程开发
DevOps
文件结构
SKILL.md7.2 KB
版本历史
- 公开
- 来源于用户导入
如需详细了解相关要求,请访问帮助中心,或给我们提交反馈信息