Public
Script
Readme

Upstash qstash client for vals.

Lets you publish and receive qstash messages in vals.

To use, set up qstash on Upstash and set the environment variables.

  • QSTASH_TOKEN
  • QSTASH_CURRENT_SIGNING_KEY
  • QSTASH_NEXT_SIGNING_KEY

Calls are also traced via https://www.val.town/v/saolsen/tracing so if you use that library you will see linked spans between the publish call and the receive call.

For an example of how to use it see https://www.val.town/v/saolsen/try_qstash_publish and https://www.val.town/v/saolsen/try_qstash_receive.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import {
context,
propagation,
SpanStatusCode,
trace,
} from "https://cdn.skypack.dev/@opentelemetry/api";
import { Client, Receiver } from "npm:@upstash/qstash";
import { get_tracer, traced } from "https://esm.town/v/saolsen/tracing?v=136";
const client = new Client({
token: Deno.env.get("QSTASH_TOKEN")!,
});
export type PublishOpts = { url?: string; topic?: string };
export async function publish(
body: object,
opts: PublishOpts
): Promise<{ messageId: string }> {
if ((opts.url && opts.topic) || (!opts.url && !opts.topic)) {
throw new Error("Must set one of url or topic.");
}
return await get_tracer().startActiveSpan(`qstash:publish`, async (span) => {
try {
const prop_output: { b3: string } = { b3: "" };
propagation.inject(context.active(), prop_output);
const result = await client.publishJSON({
body,
headers: prop_output,
...opts,
});
if (span.isRecording()) {
span.setAttributes({
"qstash:messageId": result.messageId,
});
span.setStatus({ code: SpanStatusCode.OK });
}
return result;
} catch (error) {
if (span.isRecording()) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
}
throw new Error(error);
} finally {
span.end();
}
});
}
const receiver = new Receiver({
currentSigningKey: Deno.env.get("QSTASH_CURRENT_SIGNING_KEY")!,
nextSigningKey: Deno.env.get("QSTASH_NEXT_SIGNING_KEY")!,
});
export async function receive(
body: string,
headers: Headers
): Promise<object | null> {
const is_valid = await receiver
.verify({
signature: headers.get("upstash-signature")!,
body,
})
.catch((e) => {
console.error(e);
false;
});
if (!is_valid) {
return null;
}
let span = trace.getSpan(context.active());
if (span) {
span.setAttributes({
"qstash:messageId": headers.get("upstash-message-id"),
});
}
return JSON.parse(body);
}
Val Town is a social website to write and deploy JavaScript.
Build APIs and schedule functions from your browser.
Comments
Nobody has commented on this val yet: be the first!
January 5, 2024