package org.apache.storm.streams.processors;

import org.apache.storm.state.KeyValueState;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.StateUpdater;

/* loaded from: input_file:org/apache/storm/streams/processors/UpdateStateByKeyProcessor.class */
public class UpdateStateByKeyProcessor<K, V, R> extends BaseProcessor<Pair<K, V>> implements StatefulProcessor<K, R> {
    private final StateUpdater<V, R> stateUpdater;
    private KeyValueState<K, R> keyValueState;

    public UpdateStateByKeyProcessor(StateUpdater<V, R> stateUpdater) {
        this.stateUpdater = stateUpdater;
    }

    @Override // org.apache.storm.streams.processors.StatefulProcessor
    public void initState(KeyValueState<K, R> keyValueState) {
        this.keyValueState = keyValueState;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void execute(Pair<K, V> pair) {
        K first = pair.getFirst();
        V second = pair.getSecond();
        R r = this.keyValueState.get(first);
        if (r == null) {
            r = this.stateUpdater.init();
        }
        R apply = this.stateUpdater.apply(r, second);
        this.keyValueState.put(first, apply);
        this.context.forward(Pair.of(first, apply));
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
