Test Pulsar Functions in CI (#434)

This commit is contained in:
Lari Hotari 2024-01-17 04:12:37 -08:00 committed by GitHub
parent cfa156f738
commit e6ccd93d4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 13 deletions

View File

@ -69,7 +69,7 @@ ci::test_pulsar_admin_api_access
# test producer/consumer
ci::test_pulsar_producer_consumer "${test_action}"
if [[ "x${FUNCTION}" == "xtrue" ]]; then
if [[ "$(ci::helm_values_for_deployment | yq .components.functions)" == "true" ]]; then
# test functions
ci::test_pulsar_function
fi

View File

@ -254,6 +254,11 @@ function ci::test_pulsar_admin_api_access() {
ci::retry ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin tenants list
}
function ci::test_create_test_namespace() {
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin tenants create pulsar-ci
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin namespaces create pulsar-ci/test
}
function ci::test_pulsar_producer_consumer() {
action="${1:-"produce-consume"}"
echo "Testing with ${action}"
@ -264,8 +269,7 @@ function ci::test_pulsar_producer_consumer() {
fi
set -x
if [[ "${action}" == "produce" || "${action}" == "produce-consume" ]]; then
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin tenants create pulsar-ci
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin namespaces create pulsar-ci/test
ci::test_create_test_namespace
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin topics create pulsar-ci/test/test-topic
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin topics create-subscription -s test pulsar-ci/test/test-topic
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-client produce -m "test-message" pulsar-ci/test/test-topic
@ -280,31 +284,39 @@ function ci::test_pulsar_producer_consumer() {
}
function ci::wait_function_running() {
num_running=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions status --tenant pulsar-ci --namespace test --name test-function | bin/jq .numRunning')
num_running=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions status --tenant pulsar-ci --namespace test --name test-function' | jq .numRunning)
while [[ ${num_running} -lt 1 ]]; do
echo ${num_running}
echo "Waiting 15 seconds for function to be running"
sleep 15
${KUBECTL} get pods -n ${NAMESPACE} --field-selector=status.phase=Running
${KUBECTL} get events --sort-by=.lastTimestamp -A | tail -n 30 || true
num_running=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions status --tenant pulsar-ci --namespace test --name test-function | bin/jq .numRunning')
num_running=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions status --tenant pulsar-ci --namespace test --name test-function' | jq .numRunning)
done
}
function ci::wait_message_processed() {
num_processed=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions stats --tenant pulsar-ci --namespace test --name test-function | bin/jq .processedSuccessfullyTotal')
num_processed=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions stats --tenant pulsar-ci --namespace test --name test-function' | jq .processedSuccessfullyTotal)
while [[ ${num_processed} -lt 1 ]]; do
echo ${num_processed}
echo "Waiting 15 seconds for message to be processed"
sleep 15
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin functions stats --tenant pulsar-ci --namespace test --name test-function
num_processed=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions stats --tenant pulsar-ci --namespace test --name test-function | bin/jq .processedSuccessfullyTotal')
num_processed=$(${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bash -c 'bin/pulsar-admin functions stats --tenant pulsar-ci --namespace test --name test-function' | jq .processedSuccessfullyTotal)
done
}
function ci::test_pulsar_function() {
echo "Testing functions"
echo "Creating function"
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin functions create --tenant pulsar-ci --namespace test --name test-function --inputs "pulsar-ci/test/test_input" --output "pulsar-ci/test/test_output" --parallelism 1 --classname org.apache.pulsar.functions.api.examples.ExclamationFunction --jar /pulsar/examples/api-examples.jar
echo "Creating subscription for output topic"
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-admin topics create-subscription -s test pulsar-ci/test/test_output
echo "Waiting for function to be ready"
# wait until the function is running
# TODO: re-enable function test
# ci::wait_function_running
# ${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-client produce -m "hello pulsar function!" pulsar-ci/test/test_input
# ci::wait_message_processed
ci::wait_function_running
echo "Sending input message"
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-client produce -m 'hello pulsar function!' pulsar-ci/test/test_input
echo "Waiting for message to be processed"
ci::wait_message_processed
echo "Consuming output message"
${KUBECTL} exec -n ${NAMESPACE} ${CLUSTER}-toolset-0 -- bin/pulsar-client consume -s test pulsar-ci/test/test_output
}