import boto3
import sagemaker
import pandas as pd
from pprint import pprint
import botocore
= botocore.config.Config(user_agent_extra='dlai-pds/c3/w3')
config
# low-level service client of the boto3 session
= boto3.client(service_name='sagemaker',
sm =config)
config
= boto3.client('sagemaker-runtime',
sm_runtime =config)
config
= sagemaker.Session(sagemaker_client=sm,
sess =sm_runtime)
sagemaker_runtime_client
= sess.default_bucket()
bucket = sagemaker.get_execution_role()
role = sess.boto_region_name
region
= boto3.Session().client(service_name='s3',
s3 =config)
config= boto3.Session().client(service_name='cognito-idp',
cognito_idp =config)
config= boto3.Session().client(service_name='sagemaker-a2i-runtime',
a2i =config) config
1 Introduction
In earlier articles we introduced AWS cloud services for data science, and showed how it can help with different stages of the data science & machine learning workflow.
In this project we will create our own human workforce, a human task UI, and then define the human review workflow to perform data labeling for an ML task. We will make the original predictions of the labels with the custom ML model, and then create a human loop if the probability scores are lower than the preset threshold. After the completion of the human loop tasks, we will review the results and prepare data for re-training.
Let’s install and import the required modules.
2 Set up Amazon Cognito user pool and define human workforce
The first step in the creation of the human-in-the-loop pipeline will be to create our own private workforce.
Amazon Cognito provides authentication, authorization, and user management for apps. This enables our workers to sign in directly to the labeling UI with a username and password.
We will construct an Amazon Cognito user pool, setting up its client, domain, and group. Then we’ll create a SageMaker workforce, linking it to the Cognito user pool. Followed by the creation of a SageMaker workteam, linking it to the Cognito user pool and group. And finally, we will create a pool user and add it to the group.
To get started, let’s construct the user pool and user pool client names.
import time
= int(time.time())
timestamp
= 'groundtruth-user-pool-{}'.format(timestamp)
user_pool_name = 'groundtruth-user-pool-client-{}'.format(timestamp)
user_pool_client_name
print("Amazon Cognito user pool name: {}".format(user_pool_name))
print("Amazon Cognito user pool client name: {}".format(user_pool_client_name))
Amazon Cognito user pool name: groundtruth-user-pool-1677153775
Amazon Cognito user pool client name: groundtruth-user-pool-client-1677153775
2.1 Create Amazon Cognito user pool
The function cognito_idp.create_user_pool
creates a new Amazon Cognito user pool. Passing the function result into a variable we can get the information about the response. The result is in dictionary format.
= cognito_idp.create_user_pool(PoolName=user_pool_name)
create_user_pool_response = create_user_pool_response['UserPool']['Id']
user_pool_id
print("Amazon Cognito user pool ID: {}".format(user_pool_id))
Amazon Cognito user pool ID: us-east-1_8s0SOCEPn
Let’s pull the Amazon Cognito user pool name from its description.
print(create_user_pool_response['UserPool'].keys())
dict_keys(['Id', 'Name', 'Policies', 'DeletionProtection', 'LambdaConfig', 'LastModifiedDate', 'CreationDate', 'SchemaAttributes', 'VerificationMessageTemplate', 'UserAttributeUpdateSettings', 'MfaConfiguration', 'EstimatedNumberOfUsers', 'EmailConfiguration', 'AdminCreateUserConfig', 'Arn'])
= create_user_pool_response['UserPool']['Name']
user_pool_name print('Amazon Cognito user pool name: {}'.format(user_pool_name))
Amazon Cognito user pool name: groundtruth-user-pool-1677153775
2.2 Create Amazon Cognito user pool client
Now let’s set up the Amazon Cognito user pool client for the created above user pool.
The Amazon Cognito user pool client implements an open standard for authorization framework, OAuth
. The standard enables apps to obtain limited access (scopes) to a user’s data without giving away a user’s password. It decouples authentication from authorization and supports multiple use cases addressing different device capabilities.
Lets create the Amazon Cognito user pool client for the constructed user pool.
= cognito_idp.create_user_pool_client( # Replace None
create_user_pool_client_response =user_pool_id,
UserPoolId=user_pool_client_name,
ClientName=True, # boolean to specify whether you want to generate a secret
GenerateSecret# a list of provider names for the identity providers that are supported on this client, e.g. Cognito, Facebook, Google
=[
SupportedIdentityProviders'COGNITO'
],# a list of the allowed OAuth flows, e.g. code, implicit, client_credentials
=[
AllowedOAuthFlows'code',
'implicit'
],# a list of the allowed OAuth scopes, e.g. phone, email, openid, and profile
=[
AllowedOAuthScopes'email',
'openid',
'profile'
],# a list of allowed redirect (callback) URLs for the identity providers
=[
CallbackURLs'https://datascienceonaws.com',
],# set to true if the client is allowed to follow the OAuth protocol when interacting with Cognito user pools
=True
AllowedOAuthFlowsUserPoolClient
)
= create_user_pool_client_response['UserPoolClient']['ClientId']
client_id print('Amazon Cognito user pool client ID: {}'.format(client_id))
Amazon Cognito user pool client ID: 4ebq1ga0irfdvssomfjhbh5fgq
2.3 Create Amazon Cognito user pool domain and group
Now we set up the Amazon Cognito user pool domain for the constructed user pool.
= 'groundtruth-user-pool-domain-{}'.format(timestamp)
user_pool_domain_name
try:
cognito_idp.create_user_pool_domain( =user_pool_id,
UserPoolId=user_pool_domain_name
Domain
)print("Created Amazon Cognito user pool domain: {}".format(user_pool_domain_name))
except:
print("Amazon Cognito user pool domain {} already exists".format(user_pool_domain_name))
Created Amazon Cognito user pool domain: groundtruth-user-pool-domain-1677153775
We will use the following function to check if the Amazon Cognito user group already exists.
def check_user_pool_group_existence(user_pool_id, user_pool_group_name):
for group in cognito_idp.list_groups(UserPoolId=user_pool_id)['Groups']:
if user_pool_group_name == group['GroupName']:
return True
return False
Now we will set up the Amazon Cognito user group.
= 'groundtruth-user-pool-group-{}'.format(timestamp)
user_pool_group_name
if not check_user_pool_group_existence(user_pool_id, user_pool_group_name):
cognito_idp.create_group( =user_pool_id,
UserPoolId=user_pool_group_name
GroupName
)print("Created Amazon Cognito user group: {}".format(user_pool_group_name))
else:
print("Amazon Cognito user group {} already exists".format(user_pool_group_name))
Created Amazon Cognito user group: groundtruth-user-pool-group-1677153775
2.4 Create workforce and workteam
We can use the following function to check if the workforce already exists. We can only create one workforce per region, therefore we’ll have to delete any other existing workforce, together with all of the related workteams.
def check_workforce_existence(workforce_name):
for workforce in sm.list_workforces()['Workforces']:
if workforce_name == workforce['WorkforceName']:
return True
else:
for workteam in sm.list_workteams()['Workteams']:
=workteam['WorkteamName'])
sm.delete_workteam(WorkteamName=workforce['WorkforceName'])
sm.delete_workforce(WorkforceNamereturn False
Lets create a workforce.
= 'groundtruth-workforce-name-{}'.format(timestamp)
workforce_name
if not check_workforce_existence(workforce_name):
= sm.create_workforce(
create_workforce_response =workforce_name,
WorkforceName={
CognitoConfig'UserPool': user_pool_id,
'ClientId': client_id
}
)print("Workforce name: {}".format(workforce_name))
pprint(create_workforce_response)else:
print("Workforce {} already exists".format(workforce_name))
Workforce name: groundtruth-workforce-name-1677153775
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '107',
'content-type': 'application/x-amz-json-1.1',
'date': 'Thu, 23 Feb 2023 12:04:42 GMT',
'x-amzn-requestid': '8e749026-4d1e-4758-949a-ab78fdfaafbe'},
'HTTPStatusCode': 200,
'RequestId': '8e749026-4d1e-4758-949a-ab78fdfaafbe',
'RetryAttempts': 0},
'WorkforceArn': 'arn:aws:sagemaker:us-east-1:753124839657:workforce/groundtruth-workforce-name-1677153775'}
You can use the sm.describe_workforce
function to get the information about the workforce.
= sm.describe_workforce(WorkforceName=workforce_name)
describe_workforce_response describe_workforce_response
We use the following function to check if the workteam already exists. If there are no workteams in the list, we will give some time for the workforce to set up.
def check_workteam_existence(workteam_name):
if sm.list_workteams()['Workteams']:
for workteam in sm.list_workteams()['Workteams']:
if workteam_name == workteam['WorkteamName']:
return True
else:
60)
time.sleep(return False
return False
Now lets create a workteam.
= 'groundtruth-workteam-{}'.format(timestamp)
workteam_name
if not check_workteam_existence(workteam_name):
= sm.create_workteam(
create_workteam_response ='groundtruth workteam',
Description=workforce_name,
WorkforceName=workteam_name,
WorkteamName# objects that identify the workers that make up the work team
=[{
MemberDefinitions'CognitoMemberDefinition': {
'UserPool': user_pool_id,
'ClientId': client_id,
'UserGroup': user_pool_group_name
}
}]
)
pprint(create_workteam_response)else:
print("Workteam {} already exists".format(workteam_name))
{'ResponseMetadata': {'HTTPHeaders': {'content-length': '113',
'content-type': 'application/x-amz-json-1.1',
'date': 'Thu, 23 Feb 2023 12:06:06 GMT',
'x-amzn-requestid': 'bd89c3fa-45bb-439b-aa33-f2c685e69d8a'},
'HTTPStatusCode': 200,
'RequestId': 'bd89c3fa-45bb-439b-aa33-f2c685e69d8a',
'RetryAttempts': 0},
'WorkteamArn': 'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775'}
We can use the sm.describe_workteam
function to get information about the workteam.
= sm.describe_workteam(WorkteamName=workteam_name)
describe_workteam_response describe_workteam_response
{'Workteam': {'WorkteamName': 'groundtruth-workteam-1677153775',
'MemberDefinitions': [{'CognitoMemberDefinition': {'UserPool': 'us-east-1_8s0SOCEPn',
'UserGroup': 'groundtruth-user-pool-group-1677153775',
'ClientId': '4ebq1ga0irfdvssomfjhbh5fgq'}}],
'WorkteamArn': 'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775',
'Description': 'groundtruth workteam',
'SubDomain': 'aqa042udc1.labeling.us-east-1.sagemaker.aws',
'CreateDate': datetime.datetime(2023, 2, 23, 12, 6, 5, 715000, tzinfo=tzlocal()),
'LastUpdatedDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 175000, tzinfo=tzlocal()),
'NotificationConfiguration': {}},
'ResponseMetadata': {'RequestId': '615a618f-d243-4c27-a8d5-f94290f6c790',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '615a618f-d243-4c27-a8d5-f94290f6c790',
'content-type': 'application/x-amz-json-1.1',
'content-length': '544',
'date': 'Thu, 23 Feb 2023 12:06:06 GMT'},
'RetryAttempts': 0}}
Now we can pull the workteam ARN either from create_workteam_response
or describe_workteam_response
.
= describe_workteam_response['Workteam']['WorkteamArn']
workteam_arn workteam_arn
'arn:aws:sagemaker:us-east-1:753124839657:workteam/private-crowd/groundtruth-workteam-1677153775'
2.5 Create an Amazon Cognito user and add the user to the group
We will use the following function to check if the Amazon Cognito user already exists.
def check_user_existence(user_pool_id, user_name):
for user in cognito_idp.list_users(UserPoolId=user_pool_id)['Users']:
if user_name == user['Username']:
return True
return False
Now we create a user passing the username, temporary password, and the Amazon Cognito user pool ID.
= 'user-{}'.format(timestamp)
user_name
= 'Password@420'
temporary_password
if not check_user_existence(user_pool_id, user_name):
=cognito_idp.admin_create_user(
create_user_response=user_name,
Username=user_pool_id,
UserPoolId=temporary_password,
TemporaryPassword='SUPPRESS' # suppress sending the invitation message to a user that already exists
MessageAction
)
pprint(create_user_response)else:
print("Amazon Cognito user {} already exists".format(user_name))
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
'content-length': '242',
'content-type': 'application/x-amz-json-1.1',
'date': 'Thu, 23 Feb 2023 12:06:07 GMT',
'x-amzn-requestid': '9799ecf1-9400-4385-a696-f3067a8ee4ab'},
'HTTPStatusCode': 200,
'RequestId': '9799ecf1-9400-4385-a696-f3067a8ee4ab',
'RetryAttempts': 0},
'User': {'Attributes': [{'Name': 'sub',
'Value': '7e22b0c1-059a-45b4-b69a-e1b378950097'}],
'Enabled': True,
'UserCreateDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 848000, tzinfo=tzlocal()),
'UserLastModifiedDate': datetime.datetime(2023, 2, 23, 12, 6, 7, 848000, tzinfo=tzlocal()),
'UserStatus': 'FORCE_CHANGE_PASSWORD',
'Username': 'user-1677153775'}}
Add the user into the Amazon Cognito user group.
cognito_idp.admin_add_user_to_group(=user_pool_id,
UserPoolId=user_name,
Username=user_pool_group_name
GroupName )
{'ResponseMetadata': {'RequestId': '18dd685f-63f6-4d5b-8f81-cd22d9304a5e',
'HTTPStatusCode': 200,
'HTTPHeaders': {'date': 'Thu, 23 Feb 2023 12:06:08 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '0',
'connection': 'keep-alive',
'x-amzn-requestid': '18dd685f-63f6-4d5b-8f81-cd22d9304a5e'},
'RetryAttempts': 0}}
3 Create Human Task UI
We will create a Human Task UI resource, using a worker task UI template. This template will be rendered to the human workers whenever human interaction is required.
Below there is a simple template, that is compatible with the current use case of classifying product reviews into the three sentiment classes. For other pre-built UIs (there are 70+), check: https://github.com/aws-samples/amazon-a2i-sample-task-uis
= r"""
template <script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<crowd-form>
<crowd-classifier name="sentiment"
categories="['-1', '0', '1']"
initial-value="{{ task.input.initialValue }}"
header="Classify Reviews into Sentiment: -1 (negative), 0 (neutral), and 1 (positive)">
<classification-target>
{{ task.input.taskObject }}
</classification-target>
<full-instructions header="Classify reviews into sentiment: -1 (negative), 0 (neutral), and 1 (positive)">
<p><strong>1</strong>: joy, excitement, delight</p>
<p><strong>0</strong>: neither positive or negative, such as stating a fact</p>
<p><strong>-1</strong>: anger, sarcasm, anxiety</p>
</full-instructions>
<short-instructions>
Classify reviews into sentiment: -1 (negative), 0 (neutral), and 1 (positive)
</short-instructions>
</crowd-classifier>
</crowd-form>
"""
We will now create a human task UI resource.
# Task UI name - this value is unique per account and region.
= 'ui-{}'.format(timestamp)
task_ui_name
= sm.create_human_task_ui(
human_task_ui_response =task_ui_name,
HumanTaskUiName={
UiTemplate"Content": template
}
) human_task_ui_response
{'HumanTaskUiArn': 'arn:aws:sagemaker:us-east-1:753124839657:human-task-ui/ui-1677153775',
'ResponseMetadata': {'RequestId': 'a3561000-dec3-44de-b527-1c26ea8b443d',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'a3561000-dec3-44de-b527-1c26ea8b443d',
'content-type': 'application/x-amz-json-1.1',
'content-length': '89',
'date': 'Thu, 23 Feb 2023 12:06:08 GMT'},
'RetryAttempts': 0}}
Pull the ARN of the human task UI:
= human_task_ui_response["HumanTaskUiArn"]
human_task_ui_arn print(human_task_ui_arn)
arn:aws:sagemaker:us-east-1:753124839657:human-task-ui/ui-1677153775
4 Define human review workflow
In this section, we are going to create a Flow Definition. A flow Definitions allows you to specify:
- The workforce (in fact, it is a workteam) that our tasks will be sent to.
- The instructions that our workforce will receive (worker task template).
- The configuration of our worker tasks, including the number of workers that receive a task and time limits to complete tasks.
- Where our output data will be stored.
Here we are going to use the API, but we can optionally create this workflow definition in the console as well.
For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.
Let’s construct the S3 bucket output path.
= 's3://{}/a2i-results-{}'.format(bucket, timestamp)
output_path print(output_path)
s3://sagemaker-us-east-1-753124839657/a2i-results-1677153775
Lets construct the Flow Definition with the workteam and human task UI in the human loop configurations that we created above.
# Flow definition name - this value is unique per account and region
= 'fd-{}'.format(timestamp)
flow_definition_name
= sm.create_flow_definition(
create_workflow_definition_response =flow_definition_name,
FlowDefinitionName=role,
RoleArn={
HumanLoopConfig"WorkteamArn": workteam_arn,
"HumanTaskUiArn": human_task_ui_arn,
"TaskCount": 1, # the number of workers that receive a task
"TaskDescription": "Classify Reviews into sentiment: -1 (negative), 0 (neutral), 1 (positive)",
"TaskTitle": "Classify Reviews into sentiment: -1 (negative), 0 (neutral), 1 (positive)",
},={"S3OutputPath": output_path},
OutputConfig
)
= create_workflow_definition_response["FlowDefinitionArn"] augmented_ai_flow_definition_arn
You can pull information about the Flow Definition with the function sm.describe_flow_definition
and wait for its status value FlowDefinitionStatus
to become Active
.
for _ in range(60):
= sm.describe_flow_definition(FlowDefinitionName=flow_definition_name)
describe_flow_definition_response print(describe_flow_definition_response["FlowDefinitionStatus"])
if describe_flow_definition_response["FlowDefinitionStatus"] == "Active":
print("Flow Definition is active")
break
2) time.sleep(
Active
Flow Definition is active
5 Start human loop with custom ML model
We will now deploy a custom ML model into an endpoint and call it to predict labels for some sample reviews. We need to check the confidence score for each prediction. If it is smaller than the threshold, we will engage our workforce for a human review, starting a human loop. We can fix the labels by completing the human loop tasks and review the results.
Lets set up a sentiment predictor class to be wrapped later into the PyTorch Model.
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer
class SentimentPredictor(Predictor):
def __init__(self, endpoint_name, sagemaker_session):
super().__init__(
endpoint_name, =sagemaker_session,
sagemaker_session=JSONLinesSerializer(),
serializer=JSONLinesDeserializer()
deserializer )
Now we create a SageMaker model based on the model artifact saved in the S3 bucket.
from sagemaker.pytorch.model import PyTorchModel
= 'model-{}'.format(timestamp)
pytorch_model_name
= PyTorchModel(name=pytorch_model_name,
model ='s3://dlai-practical-data-science/models/ab/variant_a/model.tar.gz',
model_data=SentimentPredictor,
predictor_cls='inference.py',
entry_point='src',
source_dir='1.6.0',
framework_version='py3',
py_version=role) role
Now we will create a SageMaker Endpoint from the model. For the purposes of this project, we will use a relatively small instance type. Please refer to this link for additional instance types that may work for your use cases outside of this lab.
%%time
= 'endpoint-{}'.format(timestamp)
pytorch_endpoint_name
= model.deploy(initial_instance_count=1,
predictor ='ml.m5.large',
instance_type=pytorch_endpoint_name) endpoint_name
----------!CPU times: user 2min 15s, sys: 9.67 s, total: 2min 24s
Wall time: 7min 24s
5.1 Start the human loop
Let’s create a list of sample reviews.
= ["I enjoy this product",
reviews "I am unhappy with this product",
"It is okay",
"sometimes it works"]
Now we can send each of the sample reviews to the model via the predictor.predict()
API call. Note that we need to pass the reviews in the JSON format that model expects as input. Then, we parse the model’s response to obtain the predicted label and the confidence score.
After that, we check the condition for when you want to engage a human for review. We can check whether the returned confidence score is under the defined threshold of 90%, which would mean that we would want to start the human loop with the predicted label and the review as inputs. Finally, we start the human loop passing the input content and Flow Definition defined above.
import json
= []
human_loops_started
= 0.90
CONFIDENCE_SCORE_THRESHOLD
for review in reviews:
= [
inputs "features": [review]},
{
]
= predictor.predict(inputs)
response print(response)
= response[0]['predicted_label']
prediction = response[0]['probability']
confidence_score
print('Checking prediction confidence {} for sample review: "{}"'.format(confidence_score, review))
# condition for when we want to engage a human for review
if confidence_score < CONFIDENCE_SCORE_THRESHOLD:
= str(time.time()).replace('.', '-') # using milliseconds
human_loop_name = {
input_content "initialValue": prediction,
"taskObject": review
}= a2i.start_human_loop(
start_loop_response =human_loop_name,
HumanLoopName=augmented_ai_flow_definition_arn,
FlowDefinitionArn={"InputContent": json.dumps(input_content)},
HumanLoopInput
)
human_loops_started.append(human_loop_name)
print(
f"Confidence score of {confidence_score * 100}% for prediction of {prediction} is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
)print(f"*** ==> Starting human loop with name: {human_loop_name} \n")
else:
print(
f"Confidence score of {confidence_score * 100}% for star rating of {prediction} is above threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
)print("Human loop not needed. \n")
[{'probability': 0.9376369118690491, 'predicted_label': 1}]
Checking prediction confidence 0.9376369118690491 for sample review: "I enjoy this product"
Confidence score of 93.76369118690491% for star rating of 1 is above threshold of 90.0%
Human loop not needed.
[{'probability': 0.6340296864509583, 'predicted_label': -1}]
Checking prediction confidence 0.6340296864509583 for sample review: "I am unhappy with this product"
Confidence score of 63.402968645095825% for prediction of -1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154445-9813657
[{'probability': 0.5422114729881287, 'predicted_label': 1}]
Checking prediction confidence 0.5422114729881287 for sample review: "It is okay"
Confidence score of 54.221147298812866% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154446-4558146
[{'probability': 0.3931102454662323, 'predicted_label': 1}]
Checking prediction confidence 0.3931102454662323 for sample review: "sometimes it works"
Confidence score of 39.31102454662323% for prediction of 1 is less than the threshold of 90.0%
*** ==> Starting human loop with name: 1677154446-8940263
Three of the sample reviews with the probability scores lower than the threshold went into the human loop. The original predicted labels are passed together with the review text and will be seen in the task.
5.2 Check status of the human loop
Function a2i.describe_human_loop
can be used to pull the information about the human loop.
= []
completed_human_loops for human_loop_name in human_loops_started:
= a2i.describe_human_loop(HumanLoopName=human_loop_name)
resp print(f"HumanLoop Name: {human_loop_name}")
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print("")
if resp["HumanLoopStatus"] == "Completed":
completed_human_loops.append(resp)
HumanLoop Name: 1677154445-9813657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154445-9813657/output.json'}
HumanLoop Name: 1677154446-4558146
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-4558146/output.json'}
HumanLoop Name: 1677154446-8940263
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-8940263/output.json'}
5.3 Complete the human loop tasks
Now we will pull the labeling UI from the workteam information to get into the human loop tasks in the AWS console.
= sm.describe_workteam(WorkteamName=workteam_name)["Workteam"]["SubDomain"]
labeling_ui print(labeling_ui)
aqa042udc1.labeling.us-east-1.sagemaker.aws
We will navigate to a link and login with the defined username and password.
5.4 Verify that the human loops were completed by the workforce
import time
= []
completed_human_loops for human_loop_name in human_loops_started:
= a2i.describe_human_loop(HumanLoopName=human_loop_name)
resp print(f"HumanLoop Name: {human_loop_name}")
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print("")
while resp["HumanLoopStatus"] != "Completed":
print(f"Waiting for HumanLoop to complete.")
10)
time.sleep(= a2i.describe_human_loop(HumanLoopName=human_loop_name)
resp if resp["HumanLoopStatus"] == "Completed":
completed_human_loops.append(resp)print(f"Completed!")
print("")
HumanLoop Name: 1677154445-9813657
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154445-9813657/output.json'}
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Waiting for HumanLoop to complete.
Completed!
HumanLoop Name: 1677154446-4558146
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-4558146/output.json'}
Completed!
HumanLoop Name: 1677154446-8940263
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://sagemaker-us-east-1-753124839657/a2i-results-1677153775/fd-1677153775/2023/02/23/12/14/06/1677154446-8940263/output.json'}
Completed!
This process ^^ above ^^ will not complete until we label the data following the instructions above.
5.5 View human labels and prepare the data for re-training
Once the work is complete, Amazon A2I stores the results in the specified S3 bucket and sends a Cloudwatch Event. Let’s check the S3 contents.
import re
from pprint import pprint
= []
fixed_items
for resp in completed_human_loops:
= re.split("s3://" + bucket + "/", resp["HumanLoopOutput"]["OutputS3Uri"])
split_string = split_string[1]
output_bucket_key
= s3.get_object(Bucket=bucket, Key=output_bucket_key)
response = response["Body"].read().decode("utf-8")
content = json.loads(content)
json_output
pprint(json_output)
= json_output["inputContent"]
input_content = json_output["humanAnswers"][0]["answerContent"]
human_answer = {"input_content": input_content, "human_answer": human_answer}
fixed_item fixed_items.append(fixed_item)
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:28.736Z',
'answerContent': {'sentiment': {'label': '-1'}},
'submissionTime': '2023-02-23T12:16:33.547Z',
'timeSpentInSeconds': 4.811,
'workerId': '0e31fea759d04da1',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
'humanLoopName': '1677154445-9813657',
'inputContent': {'initialValue': -1,
'taskObject': 'I am unhappy with this product'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:06.376Z',
'answerContent': {'sentiment': {'label': '0'}},
'submissionTime': '2023-02-23T12:16:23.626Z',
'timeSpentInSeconds': 17.25,
'workerId': '0e31fea759d04da1',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
'humanLoopName': '1677154446-4558146',
'inputContent': {'initialValue': 1, 'taskObject': 'It is okay'}}
{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:753124839657:flow-definition/fd-1677153775',
'humanAnswers': [{'acceptanceTime': '2023-02-23T12:16:23.694Z',
'answerContent': {'sentiment': {'label': '0'}},
'submissionTime': '2023-02-23T12:16:28.668Z',
'timeSpentInSeconds': 4.974,
'workerId': '0e31fea759d04da1',
'workerMetadata': {'identityData': {'identityProviderType': 'Cognito',
'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_8s0SOCEPn',
'sub': '7e22b0c1-059a-45b4-b69a-e1b378950097'}}}],
'humanLoopName': '1677154446-8940263',
'inputContent': {'initialValue': 1, 'taskObject': 'sometimes it works'}}
Now we can prepare the data for re-training.
= pd.DataFrame(fixed_items)
df_fixed_items df_fixed_items.head()
input_content | human_answer | |
---|---|---|
0 | {'initialValue': -1, 'taskObject': 'I am unhap... | {'sentiment': {'label': '-1'}} |
1 | {'initialValue': 1, 'taskObject': 'It is okay'} | {'sentiment': {'label': '0'}} |
2 | {'initialValue': 1, 'taskObject': 'sometimes i... | {'sentiment': {'label': '0'}} |
6 Acknowledgements
I’d like to express my thanks to the great Deep Learning AI Practical Data Science on AWS Specialisation Course which i completed, and acknowledge the use of some images and other materials from the training course in this article.