Public
Script
Val Town is a social website to write and deploy JavaScript.
Build APIs and schedule functions from your browser.
Readme

Upstash qstash client for vals.

Lets you publish and receive qstash messages in vals.

To use, set up qstash on Upstash and set the environment variables.

  • QSTASH_TOKEN
  • QSTASH_CURRENT_SIGNING_KEY
  • QSTASH_NEXT_SIGNING_KEY

Calls are also traced via https://www.val.town/v/saolsen/tracing so if you use that library you will see linked spans between the publish call and the receive call.

For an example of how to use it see https://www.val.town/v/saolsen/try_qstash_publish and https://www.val.town/v/saolsen/try_qstash_receive.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import {
context,
propagation,
SpanStatusCode,
trace,
} from "https://cdn.skypack.dev/@opentelemetry/api";
import { Client, Receiver } from "npm:@upstash/qstash";
import { get_tracer, traced } from "https://esm.town/v/saolsen/tracing?v=136";
const client = new Client({
token: Deno.env.get("QSTASH_TOKEN")!,
});
export type PublishOpts = { url?: string; topic?: string };
export async function publish(
body: object,
opts: PublishOpts
): Promise<{ messageId: string }> {
if ((opts.url && opts.topic) || (!opts.url && !opts.topic)) {
throw new Error("Must set one of url or topic.");
}
return await get_tracer().startActiveSpan(`qstash:publish`, async (span) => {
try {
const prop_output: { b3: string } = { b3: "" };
propagation.inject(context.active(), prop_output);
const result = await client.publishJSON({
body,
headers: prop_output,
...opts,
});
if (span.isRecording()) {
span.setAttributes({
"qstash:messageId": result.messageId,
});
span.setStatus({ code: SpanStatusCode.OK });
}
return result;
} catch (error) {
if (span.isRecording()) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
}
throw new Error(error);
} finally {
span.end();
}
});
}
const receiver = new Receiver({
currentSigningKey: Deno.env.get("QSTASH_CURRENT_SIGNING_KEY")!,
nextSigningKey: Deno.env.get("QSTASH_NEXT_SIGNING_KEY")!,
});
export async function receive(
body: string,
headers: Headers
): Promise<object | null> {
const is_valid = await receiver
.verify({
signature: headers.get("upstash-signature")!,
body,
})
.catch((e) => {
console.error(e);
false;
});
if (!is_valid) {
return null;
}
let span = trace.getSpan(context.active());
if (span) {
span.setAttributes({
"qstash:messageId": headers.get("upstash-message-id"),
});
}
return JSON.parse(body);
}
January 5, 2024